summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_sync_module_aws.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rgw/rgw_sync_module_aws.cc
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_sync_module_aws.cc')
-rw-r--r--src/rgw/rgw_sync_module_aws.cc1817
1 files changed, 1817 insertions, 0 deletions
diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc
new file mode 100644
index 000000000..e57327b19
--- /dev/null
+++ b/src/rgw/rgw_sync_module_aws.cc
@@ -0,0 +1,1817 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "common/errno.h"
+
+#include "rgw_common.h"
+#include "rgw_coroutine.h"
+#include "rgw_sync_module.h"
+#include "rgw_data_sync.h"
+#include "rgw_sync_module_aws.h"
+#include "rgw_cr_rados.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rest.h"
+#include "rgw_acl.h"
+#include "rgw_zone.h"
+
+#include "services/svc_zone.h"
+
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+
+#define DEFAULT_MULTIPART_SYNC_PART_SIZE (32 * 1024 * 1024)
+
+static string default_target_path = "rgw-${zonegroup}-${sid}/${bucket}";
+
+static string get_key_oid(const rgw_obj_key& key)
+{
+ string oid = key.name;
+ if (!key.instance.empty() &&
+ !key.have_null_instance()) {
+ oid += string(":") + key.instance;
+ }
+ return oid;
+}
+
+static string obj_to_aws_path(rgw::sal::RGWObject* obj)
+{
+ string path = obj->get_bucket()->get_name() + "/" + get_key_oid(obj->get_key());
+
+
+ return path;
+}
+
+/*
+
+ json configuration definition:
+
+ {
+ "connection": {
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ "host_style": <path | virtual>,
+ },
+ "acls": [ { "type": <id | email | uri>,
+ "source_id": <source_id>,
+ "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
+ "target_path": <target_path>, # override default
+
+
+ # anything below here is for non trivial configuration
+ # can be used in conjuction with the above
+
+ "default": {
+ "connection": {
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ "host_style" <path | virtual>,
+ },
+ "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
+ {
+ "type" : <id | email | uri>, # optional, default is id
+ "source_id": <id>,
+ "dest_id": <id>
+ } ... ]
+ "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
+ # final object name will be target_path + "/" + obj
+ },
+ "connections": [
+ {
+ "id": <id>,
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ } ... ],
+ "acl_profiles": [
+ {
+ "id": <id>, # acl mappings
+ "acls": [ {
+ "type": <id | email | uri>,
+ "source_id": <id>,
+ "dest_id": <id>
+ } ... ]
+ }
+ ],
+ "profiles": [
+ {
+ "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
+ "target_path": <dest>, # (override default)
+ "connection_id": <connection_id>, # optional, if empty references default connection
+ "acls_id": <mappings_id>, # optional, if empty references default mappings
+ } ... ],
+ }
+
+target path optional variables:
+
+(evaluated at init)
+sid: sync instance id, randomly generated by sync process on first sync initalization
+zonegroup: zonegroup name
+zonegroup_id: zonegroup name
+zone: zone name
+zone_id: zone name
+
+(evaluated when syncing)
+bucket: bucket name
+owner: bucket owner
+
+*/
+
+struct ACLMapping {
+ ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
+ string source_id;
+ string dest_id;
+
+ ACLMapping() = default;
+
+ ACLMapping(ACLGranteeTypeEnum t,
+ const string& s,
+ const string& d) : type(t),
+ source_id(s),
+ dest_id(d) {}
+
+ void init(const JSONFormattable& config) {
+ const string& t = config["type"];
+
+ if (t == "email") {
+ type = ACL_TYPE_EMAIL_USER;
+ } else if (t == "uri") {
+ type = ACL_TYPE_GROUP;
+ } else {
+ type = ACL_TYPE_CANON_USER;
+ }
+
+ source_id = config["source_id"];
+ dest_id = config["dest_id"];
+ }
+
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ObjectSection os(jf, "acl_mapping");
+ string s;
+ switch (type) {
+ case ACL_TYPE_EMAIL_USER:
+ s = "email";
+ break;
+ case ACL_TYPE_GROUP:
+ s = "uri";
+ break;
+ default:
+ s = "id";
+ break;
+ }
+ encode_json("type", s, &jf);
+ encode_json("source_id", source_id, &jf);
+ encode_json("dest_id", dest_id, &jf);
+ }
+};
+
+struct ACLMappings {
+ map<string, ACLMapping> acl_mappings;
+
+ void init(const JSONFormattable& config) {
+ for (auto& c : config.array()) {
+ ACLMapping m;
+ m.init(c);
+
+ acl_mappings.emplace(std::make_pair(m.source_id, m));
+ }
+ }
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ArraySection os(jf, "acls");
+
+ for (auto& i : acl_mappings) {
+ i.second.dump_conf(cct, jf);
+ }
+ }
+};
+
+struct AWSSyncConfig_ACLProfiles {
+ map<string, std::shared_ptr<ACLMappings> > acl_profiles;
+
+ void init(const JSONFormattable& config) {
+ for (auto& c : config.array()) {
+ const string& profile_id = c["id"];
+
+ std::shared_ptr<ACLMappings> ap{new ACLMappings};
+ ap->init(c["acls"]);
+
+ acl_profiles[profile_id] = ap;
+ }
+ }
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ArraySection section(jf, "acl_profiles");
+
+ for (auto& p : acl_profiles) {
+ Formatter::ObjectSection section(jf, "profile");
+ encode_json("id", p.first, &jf);
+ p.second->dump_conf(cct, jf);
+ }
+ }
+
+ bool find(const string& profile_id, ACLMappings *result) const {
+ auto iter = acl_profiles.find(profile_id);
+ if (iter == acl_profiles.end()) {
+ return false;
+ }
+ *result = *iter->second;
+ return true;
+ }
+};
+
+struct AWSSyncConfig_Connection {
+ string connection_id;
+ string endpoint;
+ RGWAccessKey key;
+ HostStyle host_style{PathStyle};
+
+ bool has_endpoint{false};
+ bool has_key{false};
+ bool has_host_style{false};
+
+ void init(const JSONFormattable& config) {
+ has_endpoint = config.exists("endpoint");
+ has_key = config.exists("access_key") || config.exists("secret");
+ has_host_style = config.exists("host_style");
+
+ connection_id = config["id"];
+ endpoint = config["endpoint"];
+
+ key = RGWAccessKey(config["access_key"], config["secret"]);
+ string host_style_str = config["host_style"];
+ if (host_style_str != "virtual") {
+ host_style = PathStyle;
+ } else {
+ host_style = VirtualStyle;
+ }
+ }
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ObjectSection section(jf, "connection");
+ encode_json("id", connection_id, &jf);
+ encode_json("endpoint", endpoint, &jf);
+ string s = (host_style == PathStyle ? "path" : "virtual");
+ encode_json("host_style", s, &jf);
+
+ {
+ Formatter::ObjectSection os(jf, "key");
+ encode_json("access_key", key.id, &jf);
+ string secret = (key.key.empty() ? "" : "******");
+ encode_json("secret", secret, &jf);
+ }
+ }
+};
+
+static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
+{
+ string sval;
+ if (config.find(key, &sval)) {
+ string err;
+ uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
+ return -EINVAL;
+ }
+ *pval = val;
+ }
+ return 0;
+}
+
+struct AWSSyncConfig_S3 {
+ uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+ uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+
+ int init(CephContext *cct, const JSONFormattable& config) {
+ int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
+ if (r < 0) {
+ return r;
+ }
+
+ r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size);
+ if (r < 0) {
+ return r;
+ }
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+ if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+ multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+ }
+ return 0;
+ }
+
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ObjectSection section(jf, "s3");
+ encode_json("multipart_sync_threshold", multipart_sync_threshold, &jf);
+ encode_json("multipart_min_part_size", multipart_min_part_size, &jf);
+ }
+};
+
+struct AWSSyncConfig_Profile {
+ string source_bucket;
+ bool prefix{false};
+ string target_path;
+ string connection_id;
+ string acls_id;
+
+ std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
+ std::shared_ptr<ACLMappings> acls;
+
+ std::shared_ptr<RGWRESTConn> conn;
+
+ void init(const JSONFormattable& config) {
+ source_bucket = config["source_bucket"];
+
+ prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');
+
+ if (prefix) {
+ source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
+ }
+
+ target_path = config["target_path"];
+ connection_id = config["connection_id"];
+ acls_id = config["acls_id"];
+
+ if (config.exists("connection")) {
+ conn_conf = make_shared<AWSSyncConfig_Connection>();
+ conn_conf->init(config["connection"]);
+ }
+
+ if (config.exists("acls")) {
+ acls = make_shared<ACLMappings>();
+ acls->init(config["acls"]);
+ }
+ }
+
+ void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
+ Formatter::ObjectSection config(jf, section);
+ string sb{source_bucket};
+ if (prefix) {
+ sb.append("*");
+ }
+ encode_json("source_bucket", sb, &jf);
+ encode_json("target_path", target_path, &jf);
+ encode_json("connection_id", connection_id, &jf);
+ encode_json("acls_id", acls_id, &jf);
+ if (conn_conf.get()) {
+ conn_conf->dump_conf(cct, jf);
+ }
+ if (acls.get()) {
+ acls->dump_conf(cct, jf);
+ }
+ }
+};
+
+static void find_and_replace(const string& src, const string& find, const string& replace, string *dest)
+{
+ string s = src;
+
+ size_t pos = s.find(find);
+ while (pos != string::npos) {
+ size_t next_ofs = pos + find.size();
+ s = s.substr(0, pos) + replace + s.substr(next_ofs);
+ pos = s.find(find, next_ofs);
+ }
+
+ *dest = s;
+}
+
+static void apply_meta_param(const string& src, const string& param, const string& val, string *dest)
+{
+ string s = string("${") + param + "}";
+ find_and_replace(src, s, val, dest);
+}
+
+
+struct AWSSyncConfig {
+ AWSSyncConfig_Profile default_profile;
+ std::shared_ptr<AWSSyncConfig_Profile> root_profile;
+
+ map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
+ AWSSyncConfig_ACLProfiles acl_profiles;
+
+ map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;
+
+ AWSSyncConfig_S3 s3;
+
+ int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
+ bool connection_must_exist) {
+ if (!profile.connection_id.empty()) {
+ if (profile.conn_conf) {
+ ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
+ return -EINVAL;
+ }
+ if (connections.find(profile.connection_id) == connections.end()) {
+ ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
+ return -EINVAL;
+ }
+ profile.conn_conf = connections[profile.connection_id];
+ } else if (!profile.conn_conf) {
+ profile.connection_id = default_profile.connection_id;
+ auto i = connections.find(profile.connection_id);
+ if (i != connections.end()) {
+ profile.conn_conf = i->second;
+ }
+ }
+
+ if (connection_must_exist && !profile.conn_conf) {
+ ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
+ return -EINVAL;
+ }
+
+ if (profile.conn_conf && default_profile.conn_conf) {
+ if (!profile.conn_conf->has_endpoint) {
+ profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
+ }
+ if (!profile.conn_conf->has_host_style) {
+ profile.conn_conf->host_style = default_profile.conn_conf->host_style;
+ }
+ if (!profile.conn_conf->has_key) {
+ profile.conn_conf->key = default_profile.conn_conf->key;
+ }
+ }
+
+ ACLMappings acl_mappings;
+
+ if (!profile.acls_id.empty()) {
+ if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
+ ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
+ return -EINVAL;
+ }
+ profile.acls = acl_profiles.acl_profiles[profile.acls_id];
+ } else if (!profile.acls) {
+ if (default_profile.acls) {
+ profile.acls = default_profile.acls;
+ profile.acls_id = default_profile.acls_id;
+ }
+ }
+
+ if (profile.target_path.empty()) {
+ profile.target_path = default_profile.target_path;
+ }
+ if (profile.target_path.empty()) {
+ profile.target_path = default_target_path;
+ }
+
+ return 0;
+ }
+
+ int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
+ std::shared_ptr<AWSSyncConfig_Profile> profile;
+ profile.reset(new AWSSyncConfig_Profile);
+ profile->init(profile_conf);
+
+ int ret = init_profile(cct, profile_conf, *profile, true);
+ if (ret < 0) {
+ return ret;
+ }
+
+ auto& sb = profile->source_bucket;
+
+ if (explicit_profiles.find(sb) != explicit_profiles.end()) {
+ ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
+ }
+
+ explicit_profiles[sb] = profile;
+ if (ptarget) {
+ *ptarget = profile;
+ }
+ return 0;
+ }
+
+ bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
+ const string& name = bucket.name;
+ auto iter = explicit_profiles.upper_bound(name);
+ if (iter == explicit_profiles.begin()) {
+ return false;
+ }
+
+ --iter;
+ if (iter->first.size() > name.size()) {
+ return false;
+ }
+ if (name.compare(0, iter->first.size(), iter->first) != 0) {
+ return false;
+ }
+
+ std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;
+
+ if (!target->prefix &&
+ name.size() != iter->first.size()) {
+ return false;
+ }
+
+ *result = target;
+ return true;
+ }
+
+ void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
+ if (!do_find_profile(bucket, result)) {
+ *result = root_profile;
+ }
+ }
+
+ AWSSyncConfig() {}
+
+ int init(CephContext *cct, const JSONFormattable& config) {
+ auto& default_conf = config["default"];
+
+ if (config.exists("default")) {
+ default_profile.init(default_conf);
+ init_profile(cct, default_conf, default_profile, false);
+ }
+
+ for (auto& conn : config["connections"].array()) {
+ auto new_conn = conn;
+
+ std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
+ c->init(new_conn);
+
+ connections[new_conn["id"]] = c;
+ }
+
+ acl_profiles.init(config["acl_profiles"]);
+
+ int r = s3.init(cct, config["s3"]);
+ if (r < 0) {
+ return r;
+ }
+
+ auto new_root_conf = config;
+
+ r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */
+ if (r < 0) {
+ return r;
+ }
+
+ for (auto target_conf : config["profiles"].array()) {
+ int r = init_target(cct, target_conf, nullptr);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ JSONFormatter jf(true);
+ dump_conf(cct, jf);
+ stringstream ss;
+ jf.flush(ss);
+
+ ldout(cct, 5) << "sync module config (parsed representation):\n" << ss.str() << dendl;
+
+ return 0;
+ }
+
+ void expand_target(RGWDataSyncCtx *sc, const string& sid, const string& path, string *dest) {
+ apply_meta_param(path, "sid", sid, dest);
+
+ const RGWZoneGroup& zg = sc->env->svc->zone->get_zonegroup();
+ apply_meta_param(path, "zonegroup", zg.get_name(), dest);
+ apply_meta_param(path, "zonegroup_id", zg.get_id(), dest);
+
+ const RGWZone& zone = sc->env->svc->zone->get_zone();
+ apply_meta_param(path, "zone", zone.name, dest);
+ apply_meta_param(path, "zone_id", zone.id, dest);
+ }
+
+ void update_config(RGWDataSyncCtx *sc, const string& sid) {
+ expand_target(sc, sid, root_profile->target_path, &root_profile->target_path);
+ ldout(sc->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
+ for (auto& t : explicit_profiles) {
+ expand_target(sc, sid, t.second->target_path, &t.second->target_path);
+ ldout(sc->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
+ }
+ }
+
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ObjectSection config(jf, "config");
+ root_profile->dump_conf(cct, jf);
+ jf.open_array_section("connections");
+ for (auto c : connections) {
+ c.second->dump_conf(cct, jf);
+ }
+ jf.close_section();
+
+ acl_profiles.dump_conf(cct, jf);
+
+ { // targets
+ Formatter::ArraySection as(jf, "profiles");
+ for (auto& t : explicit_profiles) {
+ Formatter::ObjectSection target_section(jf, "profile");
+ encode_json("name", t.first, &jf);
+ t.second->dump_conf(cct, jf);
+ }
+ }
+ }
+
+ string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
+ const RGWBucketInfo& bucket_info,
+ const rgw_obj_key& obj) {
+ string bucket_str;
+ string owner;
+ if (!bucket_info.owner.tenant.empty()) {
+ bucket_str = owner = bucket_info.owner.tenant + "-";
+ owner += bucket_info.owner.id;
+ }
+ bucket_str += bucket_info.bucket.name;
+
+ const string& path = profile->target_path;
+
+ string new_path;
+ apply_meta_param(path, "bucket", bucket_str, &new_path);
+ apply_meta_param(new_path, "owner", owner, &new_path);
+
+ new_path += string("/") + get_key_oid(obj);
+
+ return new_path;
+ }
+
+ void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
+ const RGWBucketInfo& bucket_info,
+ const rgw_obj_key& obj,
+ string *bucket_name,
+ string *obj_name) {
+ string path = get_path(profile, bucket_info, obj);
+ size_t pos = path.find('/');
+
+ *bucket_name = path.substr(0, pos);
+ *obj_name = path.substr(pos + 1);
+ }
+
+ void init_conns(RGWDataSyncCtx *sc, const string& id) {
+ auto sync_env = sc->env;
+
+ update_config(sc, id);
+
+ auto& root_conf = root_profile->conn_conf;
+
+ root_profile->conn.reset(new S3RESTConn(sc->cct,
+ sync_env->svc->zone,
+ id,
+ { root_conf->endpoint },
+ root_conf->key,
+ root_conf->host_style));
+
+ for (auto i : explicit_profiles) {
+ auto& c = i.second;
+
+ c->conn.reset(new S3RESTConn(sc->cct,
+ sync_env->svc->zone,
+ id,
+ { c->conn_conf->endpoint },
+ c->conn_conf->key,
+ c->conn_conf->host_style));
+ }
+ }
+};
+
+
+struct AWSSyncInstanceEnv {
+ AWSSyncConfig conf;
+ string id;
+
+ explicit AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}
+
+ void init(RGWDataSyncCtx *sc, uint64_t instance_id) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
+ id = buf;
+
+ conf.init_conns(sc, id);
+ }
+
+ void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
+ conf.find_profile(bucket, ptarget);
+ ceph_assert(ptarget);
+ }
+};
+
+static int do_decode_rest_obj(CephContext *cct, map<string, bufferlist>& attrs, map<string, string>& headers, rgw_rest_obj *info)
+{
+ for (auto header : headers) {
+ const string& val = header.second;
+ if (header.first == "RGWX_OBJECT_SIZE") {
+ info->content_len = atoi(val.c_str());
+ } else {
+ info->attrs[header.first] = val;
+ }
+ }
+
+ info->acls.set_ctx(cct);
+ auto aiter = attrs.find(RGW_ATTR_ACL);
+ if (aiter != attrs.end()) {
+ bufferlist& bl = aiter->second;
+ auto bliter = bl.cbegin();
+ try {
+ info->acls.decode(bliter);
+ } catch (buffer::error& err) {
+ ldout(cct, 0) << "ERROR: failed to decode policy off attrs" << dendl;
+ return -EIO;
+ }
+ } else {
+ ldout(cct, 0) << "WARNING: acl attrs not provided" << dendl;
+ }
+
+ return 0;
+}
+
+class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
+{
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *conn;
+ rgw::sal::RGWObject* src_obj;
+ RGWRESTConn::get_obj_params req_params;
+
+ rgw_sync_aws_src_obj_properties src_properties;
+public:
+ RGWRESTStreamGetCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWDataSyncCtx *_sc,
+ RGWRESTConn *_conn,
+ rgw::sal::RGWObject* _src_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller,
+ _sc->env->http_manager, _src_obj->get_key()),
+ sc(_sc), conn(_conn), src_obj(_src_obj),
+ src_properties(_src_properties) {
+ }
+
+ int init(const DoutPrefixProvider *dpp) override {
+ /* init input connection */
+
+
+ req_params.get_op = true;
+ req_params.prepend_metadata = true;
+
+ req_params.unmod_ptr = &src_properties.mtime;
+ req_params.etag = src_properties.etag;
+ req_params.mod_zone_id = src_properties.zone_short_id;
+ req_params.mod_pg_ver = src_properties.pg_ver;
+
+ if (range.is_set) {
+ req_params.range_is_set = true;
+ req_params.range_start = range.ofs;
+ req_params.range_end = range.ofs + range.size - 1;
+ }
+
+ RGWRESTStreamRWRequest *in_req;
+ int ret = conn->get_obj(dpp, src_obj, req_params, false /* send */, &in_req);
+ if (ret < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): conn->get_obj() returned ret=" << ret << dendl;
+ return ret;
+ }
+
+ set_req(in_req);
+
+ return RGWStreamReadHTTPResourceCRF::init(dpp);
+ }
+
+ int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override {
+ map<string, bufferlist> src_attrs;
+
+ ldout(sc->cct, 20) << __func__ << ":" << " headers=" << headers << " extra_data.length()=" << extra_data.length() << dendl;
+
+ if (extra_data.length() > 0) {
+ JSONParser jp;
+ if (!jp.parse(extra_data.c_str(), extra_data.length())) {
+ ldout(sc->cct, 0) << "ERROR: failed to parse response extra data. len=" << extra_data.length() << " data=" << extra_data.c_str() << dendl;
+ return -EIO;
+ }
+
+ JSONDecoder::decode_json("attrs", src_attrs, &jp);
+ }
+ return do_decode_rest_obj(sc->cct, src_attrs, headers, &rest_obj);
+ }
+
+ bool need_extra_data() override {
+ return true;
+ }
+};
+
+static std::set<string> keep_headers = { "CONTENT_TYPE",
+ "CONTENT_ENCODING",
+ "CONTENT_DISPOSITION",
+ "CONTENT_LANGUAGE" };
+
+class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
+{
+ RGWDataSyncCtx *sc;
+ rgw_sync_aws_src_obj_properties src_properties;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ rgw::sal::RGWObject* dest_obj;
+ string etag;
+public:
+ RGWAWSStreamPutCRF(CephContext *_cct,
+ RGWCoroutinesEnv *_env,
+ RGWCoroutine *_caller,
+ RGWDataSyncCtx *_sc,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
+ rgw::sal::RGWObject* _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sc->env->http_manager),
+ sc(_sc), src_properties(_src_properties), target(_target), dest_obj(_dest_obj) {
+ }
+
+ int init() override {
+ /* init output connection */
+ RGWRESTStreamS3PutObj *out_req{nullptr};
+
+ if (multipart.is_multipart) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", multipart.part_num);
+ rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
+ { "partNumber", buf },
+ { nullptr, nullptr } };
+ target->conn->put_obj_send_init(dest_obj, params, &out_req);
+ } else {
+ target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
+ }
+
+ set_req(out_req);
+
+ return RGWStreamWriteHTTPResourceCRF::init();
+ }
+
+ static bool keep_attr(const string& h) {
+ return (keep_headers.find(h) != keep_headers.end() ||
+ boost::algorithm::starts_with(h, "X_AMZ_"));
+ }
+
+ static void init_send_attrs(CephContext *cct,
+ const rgw_rest_obj& rest_obj,
+ const rgw_sync_aws_src_obj_properties& src_properties,
+ const AWSSyncConfig_Profile *target,
+ map<string, string> *attrs) {
+ auto& new_attrs = *attrs;
+
+ new_attrs.clear();
+
+ for (auto& hi : rest_obj.attrs) {
+ if (keep_attr(hi.first)) {
+ new_attrs.insert(hi);
+ }
+ }
+
+ auto acl = rest_obj.acls.get_acl();
+
+ map<int, vector<string> > access_map;
+
+ if (target->acls) {
+ for (auto& grant : acl.get_grant_map()) {
+ auto& orig_grantee = grant.first;
+ auto& perm = grant.second;
+
+ string grantee;
+
+ const auto& am = target->acls->acl_mappings;
+
+ auto iter = am.find(orig_grantee);
+ if (iter == am.end()) {
+ ldout(cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
+ continue;
+ }
+
+ grantee = iter->second.dest_id;
+
+ string type;
+
+ switch (iter->second.type) {
+ case ACL_TYPE_CANON_USER:
+ type = "id";
+ break;
+ case ACL_TYPE_EMAIL_USER:
+ type = "emailAddress";
+ break;
+ case ACL_TYPE_GROUP:
+ type = "uri";
+ break;
+ default:
+ continue;
+ }
+
+ string tv = type + "=" + grantee;
+
+ int flags = perm.get_permission().get_permissions();
+ if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
+ access_map[flags].push_back(tv);
+ continue;
+ }
+
+ for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
+ if (flags & i) {
+ access_map[i].push_back(tv);
+ }
+ }
+ }
+ }
+
+ for (auto aiter : access_map) {
+ int grant_type = aiter.first;
+
+ string header_str("x-amz-grant-");
+
+ switch (grant_type) {
+ case RGW_PERM_READ:
+ header_str.append("read");
+ break;
+ case RGW_PERM_WRITE:
+ header_str.append("write");
+ break;
+ case RGW_PERM_READ_ACP:
+ header_str.append("read-acp");
+ break;
+ case RGW_PERM_WRITE_ACP:
+ header_str.append("write-acp");
+ break;
+ case RGW_PERM_FULL_CONTROL:
+ header_str.append("full-control");
+ break;
+ }
+
+ string s;
+
+ for (auto viter : aiter.second) {
+ if (!s.empty()) {
+ s.append(", ");
+ }
+ s.append(viter);
+ }
+
+ ldout(cct, 20) << "acl_mappings: set acl: " << header_str << "=" << s << dendl;
+
+ new_attrs[header_str] = s;
+ }
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%llu", (long long)src_properties.versioned_epoch);
+ new_attrs["x-amz-meta-rgwx-versioned-epoch"] = buf;
+
+ utime_t ut(src_properties.mtime);
+ snprintf(buf, sizeof(buf), "%lld.%09lld",
+ (long long)ut.sec(),
+ (long long)ut.nsec());
+
+ new_attrs["x-amz-meta-rgwx-source-mtime"] = buf;
+ new_attrs["x-amz-meta-rgwx-source-etag"] = src_properties.etag;
+ new_attrs["x-amz-meta-rgwx-source-key"] = rest_obj.key.name;
+ if (!rest_obj.key.instance.empty()) {
+ new_attrs["x-amz-meta-rgwx-source-version-id"] = rest_obj.key.instance;
+ }
+ }
+
+ void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override {
+ RGWRESTStreamS3PutObj *r = static_cast<RGWRESTStreamS3PutObj *>(req);
+
+ map<string, string> new_attrs;
+ if (!multipart.is_multipart) {
+ init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
+ }
+
+ r->set_send_length(rest_obj.content_len);
+
+ RGWAccessControlPolicy policy;
+
+ r->send_ready(dpp, target->conn->get_key(), new_attrs, policy, false);
+ }
+
+ void handle_headers(const map<string, string>& headers) {
+ for (auto h : headers) {
+ if (h.first == "ETAG") {
+ etag = h.second;
+ }
+ }
+ }
+
+ bool get_etag(string *petag) {
+ if (etag.empty()) {
+ return false;
+ }
+ *petag = etag;
+ return true;
+ }
+};
+
+
+class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *source_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ rgw::sal::RGWObject* src_obj;
+ rgw::sal::RGWObject* dest_obj;
+
+ rgw_sync_aws_src_obj_properties src_properties;
+
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+public:
+ RGWAWSStreamObjToCloudPlainCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_source_conn,
+ rgw::sal::RGWObject* _src_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
+ std::shared_ptr<AWSSyncConfig_Profile> _target,
+ rgw::sal::RGWObject* _dest_obj) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ source_conn(_source_conn),
+ target(_target),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj),
+ src_properties(_src_properties) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
+ source_conn, src_obj,
+ src_properties));
+
+ /* init output */
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
+ src_properties, target, dest_obj));
+
+ yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *source_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ rgw::sal::RGWObject* src_obj;
+ rgw::sal::RGWObject* dest_obj;
+
+ rgw_sync_aws_src_obj_properties src_properties;
+
+ string upload_id;
+
+ rgw_sync_aws_multipart_part_info part_info;
+
+ std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
+ std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
+
+ string *petag;
+
+public:
+ RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_source_conn,
+ rgw::sal::RGWObject* _src_obj,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
+ rgw::sal::RGWObject* _dest_obj,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
+ const string& _upload_id,
+ const rgw_sync_aws_multipart_part_info& _part_info,
+ string *_petag) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ source_conn(_source_conn),
+ target(_target),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj),
+ src_properties(_src_properties),
+ upload_id(_upload_id),
+ part_info(_part_info),
+ petag(_petag) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ /* init input */
+ in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sc,
+ source_conn, src_obj,
+ src_properties));
+
+ in_crf->set_range(part_info.ofs, part_info.size);
+
+ /* init output */
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sc,
+ src_properties, target, dest_obj));
+
+ out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
+
+ yield call(new RGWStreamSpliceCR(cct, sc->env->http_manager, in_crf, out_crf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ if (!(static_cast<RGWAWSStreamPutCRF *>(out_crf.get()))->get_etag(petag)) {
+ ldout(sc->cct, 0) << "ERROR: failed to get etag from PUT request" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSAbortMultipartCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *dest_conn;
+ rgw::sal::RGWObject* dest_obj;
+
+ string upload_id;
+
+public:
+ RGWAWSAbortMultipartCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_dest_conn,
+ rgw::sal::RGWObject* _dest_obj,
+ const string& _upload_id) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ upload_id(_upload_id) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+
+ yield {
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+ bufferlist bl;
+ call(new RGWDeleteRESTResourceCR(sc->cct, dest_conn, sc->env->http_manager,
+ obj_to_aws_path(dest_obj), params));
+ }
+
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload for dest object=" << dest_obj << " (retcode=" << retcode << ")" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSInitMultipartCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *dest_conn;
+ rgw::sal::RGWObject* dest_obj;
+
+ uint64_t obj_size;
+ map<string, string> attrs;
+
+ bufferlist out_bl;
+
+ string *upload_id;
+
+ struct InitMultipartResult {
+ string bucket;
+ string key;
+ string upload_id;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+ RGWXMLDecoder::decode_xml("Key", key, obj);
+ RGWXMLDecoder::decode_xml("UploadId", upload_id, obj);
+ }
+ } result;
+
+public:
+ RGWAWSInitMultipartCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_dest_conn,
+ rgw::sal::RGWObject* _dest_obj,
+ uint64_t _obj_size,
+ const map<string, string>& _attrs,
+ string *_upload_id) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ obj_size(_obj_size),
+ attrs(_attrs),
+ upload_id(_upload_id) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+
+ yield {
+ rgw_http_param_pair params[] = { { "uploads", nullptr }, {nullptr, nullptr} };
+ bufferlist bl;
+ call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
+ obj_to_aws_path(dest_obj), params, &attrs, bl, &out_bl));
+ }
+
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+ return set_cr_error(retcode);
+ }
+ {
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ try {
+ RGWXMLDecoder::decode_xml("InitiateMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+ }
+
+ ldpp_dout(dpp, 20) << "init multipart result: bucket=" << result.bucket << " key=" << result.key << " upload_id=" << result.upload_id << dendl;
+
+ *upload_id = result.upload_id;
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSCompleteMultipartCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *dest_conn;
+ rgw::sal::RGWObject* dest_obj;
+
+ bufferlist out_bl;
+
+ string upload_id;
+
+ struct CompleteMultipartReq {
+ map<int, rgw_sync_aws_multipart_part_info> parts;
+
+ explicit CompleteMultipartReq(const map<int, rgw_sync_aws_multipart_part_info>& _parts) : parts(_parts) {}
+
+ void dump_xml(Formatter *f) const {
+ for (auto p : parts) {
+ f->open_object_section("Part");
+ encode_xml("PartNumber", p.first, f);
+ encode_xml("ETag", p.second.etag, f);
+ f->close_section();
+ };
+ }
+ } req_enc;
+
+ struct CompleteMultipartResult {
+ string location;
+ string bucket;
+ string key;
+ string etag;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Location", bucket, obj);
+ RGWXMLDecoder::decode_xml("Bucket", bucket, obj);
+ RGWXMLDecoder::decode_xml("Key", key, obj);
+ RGWXMLDecoder::decode_xml("ETag", etag, obj);
+ }
+ } result;
+
+public:
+ RGWAWSCompleteMultipartCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_dest_conn,
+ rgw::sal::RGWObject* _dest_obj,
+ string _upload_id,
+ const map<int, rgw_sync_aws_multipart_part_info>& _parts) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ upload_id(_upload_id),
+ req_enc(_parts) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+
+ yield {
+ rgw_http_param_pair params[] = { { "uploadId", upload_id.c_str() }, {nullptr, nullptr} };
+ stringstream ss;
+ XMLFormatter formatter;
+
+ encode_xml("CompleteMultipartUpload", req_enc, &formatter);
+
+ formatter.flush(ss);
+
+ bufferlist bl;
+ bl.append(ss.str());
+
+ call(new RGWPostRawRESTResourceCR <bufferlist> (sc->cct, dest_conn, sc->env->http_manager,
+ obj_to_aws_path(dest_obj), params, nullptr, bl, &out_bl));
+ }
+
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to initialize multipart upload for dest object=" << dest_obj << dendl;
+ return set_cr_error(retcode);
+ }
+ {
+ /*
+ * If one of the following fails we cannot abort upload, as we cannot
+ * extract the upload id. If one of these fail it's very likely that that's
+ * the least of our problem.
+ */
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+
+ try {
+ RGWXMLDecoder::decode_xml("CompleteMultipartUploadResult", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(-EIO);
+ }
+ }
+
+ ldpp_dout(dpp, 20) << "complete multipart result: location=" << result.location << " bucket=" << result.bucket << " key=" << result.key << " etag=" << result.etag << dendl;
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+
+class RGWAWSStreamAbortMultipartUploadCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWRESTConn *dest_conn;
+ rgw::sal::RGWObject* dest_obj;
+ const rgw_raw_obj status_obj;
+
+ string upload_id;
+
+public:
+
+ RGWAWSStreamAbortMultipartUploadCR(RGWDataSyncCtx *_sc,
+ RGWRESTConn *_dest_conn,
+ rgw::sal::RGWObject* _dest_obj,
+ const rgw_raw_obj& _status_obj,
+ const string& _upload_id) : RGWCoroutine(_sc->cct), sc(_sc),
+ dest_conn(_dest_conn),
+ dest_obj(_dest_obj),
+ status_obj(_status_obj),
+ upload_id(_upload_id) {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield call(new RGWAWSAbortMultipartCR(sc, dest_conn, dest_obj, upload_id));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload dest obj=" << dest_obj << " upload_id=" << upload_id << " retcode=" << retcode << dendl;
+ /* ignore error, best effort */
+ }
+ yield call(new RGWRadosRemoveCR(sc->env->store, status_obj));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to remove sync status obj obj=" << status_obj << " retcode=" << retcode << dendl;
+ /* ignore error, best effort */
+ }
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ AWSSyncConfig& conf;
+ RGWRESTConn *source_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ rgw::sal::RGWObject* src_obj;
+ rgw::sal::RGWObject* dest_obj;
+
+ uint64_t obj_size;
+ string src_etag;
+ rgw_sync_aws_src_obj_properties src_properties;
+ rgw_rest_obj rest_obj;
+
+ rgw_sync_aws_multipart_upload_info status;
+
+ map<string, string> new_attrs;
+
+ rgw_sync_aws_multipart_part_info *pcur_part_info{nullptr};
+
+ int ret_err{0};
+
+ rgw_raw_obj status_obj;
+
+public:
+ RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe,
+ AWSSyncConfig& _conf,
+ RGWRESTConn *_source_conn,
+ rgw::sal::RGWObject* _src_obj,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
+ rgw::sal::RGWObject* _dest_obj,
+ uint64_t _obj_size,
+ const rgw_sync_aws_src_obj_properties& _src_properties,
+ const rgw_rest_obj& _rest_obj) : RGWCoroutine(_sc->cct),
+ sc(_sc),
+ sync_env(_sc->env),
+ conf(_conf),
+ source_conn(_source_conn),
+ target(_target),
+ src_obj(_src_obj),
+ dest_obj(_dest_obj),
+ obj_size(_obj_size),
+ src_properties(_src_properties),
+ rest_obj(_rest_obj),
+ status_obj(sync_env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::obj_status_oid(_sync_pipe, sc->source_zone, src_obj)) {
+ }
+
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield call(new RGWSimpleRadosReadCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ status_obj, &status, false));
+
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to read sync status of object " << src_obj << " retcode=" << retcode << dendl;
+ return retcode;
+ }
+
+ if (retcode >= 0) {
+ /* check here that mtime and size did not change */
+
+ if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
+ status.src_properties.etag != src_properties.etag) {
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
+ retcode = -ENOENT;
+ }
+ }
+
+ if (retcode == -ENOENT) {
+ RGWAWSStreamPutCRF::init_send_attrs(sc->cct, rest_obj, src_properties, target.get(), &new_attrs);
+
+ yield call(new RGWAWSInitMultipartCR(sc, target->conn.get(), dest_obj, status.obj_size, std::move(new_attrs), &status.upload_id));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ status.obj_size = obj_size;
+ status.src_properties = src_properties;
+#define MULTIPART_MAX_PARTS 10000
+ uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
+ status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
+ status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
+ status.cur_part = 1;
+ }
+
+ for (; (uint32_t)status.cur_part <= status.num_parts; ++status.cur_part) {
+ yield {
+ rgw_sync_aws_multipart_part_info& cur_part_info = status.parts[status.cur_part];
+ cur_part_info.part_num = status.cur_part;
+ cur_part_info.ofs = status.cur_ofs;
+ cur_part_info.size = std::min((uint64_t)status.part_size, status.obj_size - status.cur_ofs);
+
+ pcur_part_info = &cur_part_info;
+
+ status.cur_ofs += status.part_size;
+
+ call(new RGWAWSStreamObjToCloudMultipartPartCR(sc,
+ source_conn, src_obj,
+ target,
+ dest_obj,
+ status.src_properties,
+ status.upload_id,
+ cur_part_info,
+ &cur_part_info.etag));
+ }
+
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+ ret_err = retcode;
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
+ return set_cr_error(ret_err);
+ }
+
+ yield call(new RGWSimpleRadosWriteCR<rgw_sync_aws_multipart_upload_info>(dpp, sync_env->async_rados, sync_env->svc->sysobj, status_obj, status));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to store multipart upload state, retcode=" << retcode << dendl;
+ /* continue with upload anyway */
+ }
+ ldout(sc->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
+ }
+
+ yield call(new RGWAWSCompleteMultipartCR(sc, target->conn.get(), dest_obj, status.upload_id, status.parts));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
+ ret_err = retcode;
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sc, target->conn.get(), dest_obj, status_obj, status.upload_id));
+ return set_cr_error(ret_err);
+ }
+
+ /* remove status obj */
+ yield call(new RGWRadosRemoveCR(sync_env->store, status_obj));
+ if (retcode < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: failed to abort multipart upload obj=" << src_obj << " upload_id=" << status.upload_id << " part number " << status.cur_part << " (" << cpp_strerror(-retcode) << ")" << dendl;
+ /* ignore error, best effort */
+ }
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+template <class T>
+int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
+{
+ map<string, bufferlist>::iterator iter = attrs.find(attr_name);
+ if (iter == attrs.end()) {
+ *result = def_val;
+ return 0;
+ }
+ bufferlist& bl = iter->second;
+ if (bl.length() == 0) {
+ *result = def_val;
+ return 0;
+ }
+ auto bliter = bl.cbegin();
+ try {
+ decode(*result, bliter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ return 0;
+}
+
+// maybe use Fetch Remote Obj instead?
+class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
+ rgw_bucket_sync_pipe sync_pipe;
+ AWSSyncInstanceEnv& instance;
+
+ uint64_t versioned_epoch{0};
+
+ RGWRESTConn *source_conn{nullptr};
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ bufferlist res;
+ unordered_map <string, bool> bucket_created;
+ string target_bucket_name;
+ string target_obj_name;
+ rgw_rest_obj rest_obj;
+ int ret{0};
+
+ uint32_t src_zone_short_id{0};
+ uint64_t src_pg_ver{0};
+
+ bufferlist out_bl;
+
+ struct CreateBucketResult {
+ string code;
+
+ void decode_xml(XMLObj *obj) {
+ RGWXMLDecoder::decode_xml("Code", code, obj);
+ }
+ } result;
+
+public:
+ RGWAWSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe,
+ rgw_obj_key& _key,
+ AWSSyncInstanceEnv& _instance,
+ uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
+ instance(_instance), versioned_epoch(_versioned_epoch)
+ {}
+
+ ~RGWAWSHandleRemoteObjCBCR(){
+ }
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
+ if (ret < 0) {
+ ldout(sc->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
+ } else {
+ ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
+ if (ret < 0) {
+ ldout(sc->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
+ src_pg_ver = 0; /* all or nothing */
+ }
+ }
+ ldpp_dout(dpp, 4) << "AWS: download begin: z=" << sc->source_zone
+ << " b=" << src_bucket << " k=" << key << " size=" << size
+ << " mtime=" << mtime << " etag=" << etag
+ << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
+ << dendl;
+
+ source_conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
+ if (!source_conn) {
+ ldout(sc->cct, 0) << "ERROR: cannot find http connection to zone " << sc->source_zone << dendl;
+ return set_cr_error(-EINVAL);
+ }
+
+ instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
+ instance.conf.get_target(target, sync_pipe.dest_bucket_info, key, &target_bucket_name, &target_obj_name);
+
+ if (bucket_created.find(target_bucket_name) == bucket_created.end()){
+ yield {
+ ldout(sc->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl;
+ bufferlist bl;
+ call(new RGWPutRawRESTResourceCR <bufferlist> (sc->cct, target->conn.get(),
+ sync_env->http_manager,
+ target_bucket_name, nullptr, bl, &out_bl));
+ }
+ if (retcode < 0 ) {
+ RGWXMLDecoder::XMLParser parser;
+ if (!parser.init()) {
+ ldout(sc->cct, 0) << "ERROR: failed to initialize xml parser for parsing multipart init response from server" << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (!parser.parse(out_bl.c_str(), out_bl.length(), 1)) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: failed to parse xml: " << str << dendl;
+ return set_cr_error(retcode);
+ }
+
+ try {
+ RGWXMLDecoder::decode_xml("Error", result, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ string str(out_bl.c_str(), out_bl.length());
+ ldout(sc->cct, 5) << "ERROR: unexpected xml: " << str << dendl;
+ return set_cr_error(retcode);
+ }
+
+ if (result.code != "BucketAlreadyOwnedByYou") {
+ return set_cr_error(retcode);
+ }
+ }
+
+ bucket_created[target_bucket_name] = true;
+ }
+
+ yield {
+ rgw::sal::RGWRadosBucket bucket(sync_env->store, src_bucket);
+ rgw::sal::RGWRadosObject src_obj(sync_env->store, key, &bucket);
+
+ /* init output */
+ rgw_bucket target_bucket;
+ target_bucket.name = target_bucket_name; /* this is only possible because we only use bucket name for
+ uri resolution */
+ rgw::sal::RGWRadosBucket dest_bucket(sync_env->store, target_bucket);
+ rgw::sal::RGWRadosObject dest_obj(sync_env->store, rgw_obj_key(target_obj_name), &dest_bucket);
+
+
+ rgw_sync_aws_src_obj_properties src_properties;
+ src_properties.mtime = mtime;
+ src_properties.etag = etag;
+ src_properties.zone_short_id = src_zone_short_id;
+ src_properties.pg_ver = src_pg_ver;
+ src_properties.versioned_epoch = versioned_epoch;
+
+ if (size < instance.conf.s3.multipart_sync_threshold) {
+ call(new RGWAWSStreamObjToCloudPlainCR(sc, source_conn, &src_obj,
+ src_properties,
+ target,
+ &dest_obj));
+ } else {
+ rgw_rest_obj rest_obj;
+ rest_obj.init(key);
+ if (do_decode_rest_obj(sc->cct, attrs, headers, &rest_obj)) {
+ ldout(sc->cct, 0) << "ERROR: failed to decode rest obj out of headers=" << headers << ", attrs=" << attrs << dendl;
+ return set_cr_error(-EINVAL);
+ }
+ call(new RGWAWSStreamObjToCloudMultipartCR(sc, sync_pipe, instance.conf, source_conn, &src_obj,
+ target, &dest_obj, size, src_properties, rest_obj));
+ }
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ return 0;
+ }
+};
+
+class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ rgw_bucket_sync_pipe sync_pipe;
+ AWSSyncInstanceEnv& instance;
+ uint64_t versioned_epoch;
+public:
+ RGWAWSHandleRemoteObjCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ AWSSyncInstanceEnv& _instance, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
+ instance(_instance), versioned_epoch(_versioned_epoch) {
+ }
+
+ ~RGWAWSHandleRemoteObjCR() {}
+
+ RGWStatRemoteObjCBCR *allocate_callback() override {
+ return new RGWAWSHandleRemoteObjCBCR(sc, sync_pipe, key, instance, versioned_epoch);
+ }
+};
+
+class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
+ rgw_bucket_sync_pipe sync_pipe;
+ rgw_obj_key key;
+ ceph::real_time mtime;
+ AWSSyncInstanceEnv& instance;
+ int ret{0};
+public:
+ RGWAWSRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sc->cct), sc(_sc),
+ sync_pipe(_sync_pipe), key(_key),
+ mtime(_mtime), instance(_instance) {}
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ ldout(sc->cct, 0) << ": remove remote obj: z=" << sc->source_zone
+ << " b=" <<sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ yield {
+ instance.get_profile(sync_pipe.info.source_bs.bucket, &target);
+ string path = instance.conf.get_path(target, sync_pipe.dest_bucket_info, key);
+ ldout(sc->cct, 0) << "AWS: removing aws object at" << path << dendl;
+
+ call(new RGWDeleteRESTResourceCR(sc->cct, target->conn.get(),
+ sc->env->http_manager,
+ path, nullptr /* params */));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
+};
+
+
+class RGWAWSDataSyncModule: public RGWDataSyncModule {
+ CephContext *cct;
+ AWSSyncInstanceEnv instance;
+public:
+ RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
+ cct(_cct),
+ instance(_conf) {
+ }
+
+ void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
+ instance.init(sc, instance_id);
+ }
+
+ ~RGWAWSDataSyncModule() {}
+
+ RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
+ std::optional<uint64_t> versioned_epoch,
+ rgw_zone_set *zones_trace) override {
+ ldout(sc->cct, 0) << instance.id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ return new RGWAWSHandleRemoteObjCR(sc, sync_pipe, key, instance, versioned_epoch.value_or(0));
+ }
+ RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch,
+ rgw_zone_set *zones_trace) override {
+ ldout(sc->cct, 0) <<"rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWAWSRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, instance);
+ }
+ RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
+ rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch,
+ rgw_zone_set *zones_trace) override {
+ ldout(sc->cct, 0) <<"AWS Not implemented: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
+ << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return NULL;
+ }
+};
+
+class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
+ RGWAWSDataSyncModule data_handler;
+public:
+ RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
+ RGWDataSyncModule *get_data_handler() override {
+ return &data_handler;
+ }
+};
+
+int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
+ AWSSyncConfig conf;
+
+ int r = conf.init(cct, config);
+ if (r < 0) {
+ return r;
+ }
+
+ instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
+ return 0;
+}