diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/rgw/rgw_bucket_sync.cc | |
parent | Initial commit. (diff) | |
download | ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_bucket_sync.cc')
-rw-r--r-- | src/rgw/rgw_bucket_sync.cc | 932 |
1 files changed, 932 insertions, 0 deletions
diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc new file mode 100644 index 000000000..357b56d9a --- /dev/null +++ b/src/rgw/rgw_bucket_sync.cc @@ -0,0 +1,932 @@ + + +#include "rgw_common.h" +#include "rgw_bucket_sync.h" +#include "rgw_data_sync.h" +#include "rgw_zone.h" + +#include "services/svc_zone.h" +#include "services/svc_bucket_sync.h" + +#define dout_subsys ceph_subsys_rgw + + +ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { + os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or(rgw_zone_id()) << ",az=" << (int)e.all_zones << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { + os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { + os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<rgw_zone_id>()) << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) { + os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} + +static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes, + const rgw_zone_id& source_zone, + const rgw_zone_id& dest_zone) +{ + std::vector<rgw_sync_bucket_pipe> relevant_pipes; + for (auto& p : pipes) { + if (p.source.match_zone(source_zone) && + p.dest.match_zone(dest_zone)) { + for (auto pipe : p.expand()) { + pipe.source.apply_zone(source_zone); + pipe.dest.apply_zone(dest_zone); + relevant_pipes.push_back(pipe); + } + } + } + + return relevant_pipes; +} + +static bool is_wildcard_bucket(const rgw_bucket& bucket) +{ + return bucket.name.empty(); +} + +void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const +{ + encode_json("zone", zone.id, f); + encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); + encode_json("sources", sources, f); + encode_json("dests", dests, f); +} + + +template <typename CB1, typename CB2> +void rgw_sync_group_pipe_map::try_add_to_pipe_map(const rgw_zone_id& source_zone, + const rgw_zone_id& dest_zone, + const std::vector<rgw_sync_bucket_pipes>& pipes, + zb_pipe_map_t *pipe_map, + CB1 filter_cb, + CB2 call_filter_cb) +{ + if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) { + return; + } + auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); + + for (auto& pipe : relevant_pipes) { + rgw_sync_bucket_entity zb; + if (!call_filter_cb(pipe, &zb)) { + continue; + } + pipe_map->insert(make_pair(zb, pipe)); + } +} + +template <typename CB> +void rgw_sync_group_pipe_map::try_add_source(const rgw_zone_id& source_zone, + const rgw_zone_id& dest_zone, + const std::vector<rgw_sync_bucket_pipes>& pipes, + CB filter_cb) +{ + return try_add_to_pipe_map(source_zone, dest_zone, pipes, + &sources, + filter_cb, + [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { + *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()}; + return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); + }); +} + +template <typename CB> +void rgw_sync_group_pipe_map::try_add_dest(const rgw_zone_id& source_zone, + const rgw_zone_id& dest_zone, + const std::vector<rgw_sync_bucket_pipes>& pipes, + CB filter_cb) +{ + return try_add_to_pipe_map(source_zone, dest_zone, pipes, + &dests, + filter_cb, + [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { + *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()}; + return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); + }); +} + +using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t; + +pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m, + const rgw_zone_id& zone, + std::optional<rgw_bucket> b) const +{ + if (!b) { + return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()}); + } + + auto zb = rgw_sync_bucket_entity{zone, *b}; + + auto range = m.equal_range(zb); + if (range.first == range.second && + !is_wildcard_bucket(*b)) { + /* couldn't find the specific bucket, try to find by wildcard */ + zb.bucket = rgw_bucket(); + range = m.equal_range(zb); + } + + return range; +} + + +template <typename CB> +void rgw_sync_group_pipe_map::init(CephContext *cct, + const rgw_zone_id& _zone, + std::optional<rgw_bucket> _bucket, + const rgw_sync_policy_group& group, + rgw_sync_data_flow_group *_default_flow, + std::set<rgw_zone_id> *_pall_zones, + CB filter_cb) { + zone = _zone; + bucket = _bucket; + default_flow = _default_flow; + pall_zones = _pall_zones; + + rgw_sync_bucket_entity zb(zone, bucket); + + status = group.status; + + std::vector<rgw_sync_bucket_pipes> zone_pipes; + + string bucket_key = (bucket ? bucket->get_key() : "*"); + + /* only look at pipes that touch the specific zone and bucket */ + for (auto& pipe : group.pipes) { + if (pipe.contains_zone_bucket(zone, bucket)) { + ldout(cct, 20) << __func__ << "(): pipe_map (zone=" << zone << " bucket=" << bucket_key << "): adding potential pipe: " << pipe << dendl; + zone_pipes.push_back(pipe); + } + } + + const rgw_sync_data_flow_group *pflow; + + if (!group.data_flow.empty()) { + pflow = &group.data_flow; + } else { + if (!default_flow) { + return; + } + pflow = default_flow; + } + + auto& flow = *pflow; + + pall_zones->insert(zone); + + /* symmetrical */ + for (auto& symmetrical_group : flow.symmetrical) { + if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { + for (auto& z : symmetrical_group.zones) { + if (z != zone) { + pall_zones->insert(z); + try_add_source(z, zone, zone_pipes, filter_cb); + try_add_dest(zone, z, zone_pipes, filter_cb); + } + } + } + } + + /* directional */ + for (auto& rule : flow.directional) { + if (rule.source_zone == zone) { + pall_zones->insert(rule.dest_zone); + try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); + } else if (rule.dest_zone == zone) { + pall_zones->insert(rule.source_zone); + try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); + } + } +} + +/* + * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} + */ +vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const rgw_zone_id& source_zone, + std::optional<rgw_bucket> source_bucket, + std::optional<rgw_bucket> dest_bucket) const { + vector<rgw_sync_bucket_pipe> result; + + auto range = find_pipes(sources, source_zone, source_bucket); + + for (auto iter = range.first; iter != range.second; ++iter) { + auto pipe = iter->second; + if (pipe.dest.match_bucket(dest_bucket)) { + result.push_back(pipe); + } + } + return result; +} + +/* + * find all relevant pipes in other zones that pull from a specific + * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} + */ +vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket, + const rgw_zone_id& dest_zone, + std::optional<rgw_bucket> dest_bucket) const { + vector<rgw_sync_bucket_pipe> result; + + auto range = find_pipes(dests, dest_zone, dest_bucket); + + for (auto iter = range.first; iter != range.second; ++iter) { + auto pipe = iter->second; + if (pipe.source.match_bucket(source_bucket)) { + result.push_back(pipe); + } + } + + return result; +} + +/* + * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} + */ +vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const rgw_zone_id& source_zone, + std::optional<rgw_bucket> source_bucket, + const rgw_zone_id& dest_zone, + std::optional<rgw_bucket> dest_bucket) const { + if (dest_zone == zone) { + return find_source_pipes(source_zone, source_bucket, dest_bucket); + } + + if (source_zone == zone) { + return find_dest_pipes(source_bucket, dest_zone, dest_bucket); + } + + return vector<rgw_sync_bucket_pipe>(); +} + +void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) +{ + pipes.push_back(pipe); + + auto ppipe = &pipes.back(); + auto prefix = ppipe->params.source.filter.prefix.value_or(string()); + + prefix_refs.insert(make_pair(prefix, ppipe)); + + for (auto& t : ppipe->params.source.filter.tags) { + string tag = t.key + "=" + t.value; + auto titer = tag_refs.find(tag); + if (titer != tag_refs.end() && + ppipe->params.priority > titer->second->params.priority) { + titer->second = ppipe; + } else { + tag_refs[tag] = ppipe; + } + } +} + +bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key, + std::optional<rgw_user> *user, + std::optional<rgw_user> *acl_translation_owner, + std::optional<string> *storage_class, + rgw_sync_pipe_params::Mode *mode, + bool *need_more_info) const +{ + std::optional<string> owner; + + *need_more_info = false; + + if (prefix_refs.empty()) { + return false; + } + + auto end = prefix_refs.upper_bound(key.name); + auto iter = end; + if (iter != prefix_refs.begin()) { + --iter; + } + if (iter == prefix_refs.end()) { + return false; + } + + if (iter != prefix_refs.begin()) { + iter = prefix_refs.find(iter->first); /* prefix_refs is multimap, find first element + holding that key */ + } + + std::vector<decltype(iter)> iters; + + std::optional<int> priority; + + for (; iter != end; ++iter) { + auto& prefix = iter->first; + if (!boost::starts_with(key.name, prefix)) { + continue; + } + + auto& rule_params = iter->second->params; + auto& filter = rule_params.source.filter; + + if (rule_params.priority > priority) { + priority = rule_params.priority; + + if (!filter.has_tags()) { + iters.clear(); + } + iters.push_back(iter); + + *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then + we can't be sure if it would be used. + We need to first read the info from the source object */ + } + } + + if (iters.empty()) { + return false; + } + + bool conflict = false; + + std::optional<rgw_user> _user; + std::optional<rgw_sync_pipe_acl_translation> _acl_translation; + std::optional<string> _storage_class; + rgw_sync_pipe_params::Mode _mode; + + int i = 0; + for (auto& iter : iters) { + auto& rule_params = iter->second->params; + if (++i == 0) { + _user = rule_params.user; + _acl_translation = rule_params.dest.acl_translation; + _storage_class = rule_params.dest.storage_class; + _mode = rule_params.mode; + continue; + } + + conflict = !(_user == rule_params.user && + _acl_translation == rule_params.dest.acl_translation && + _storage_class == rule_params.dest.storage_class && + _mode == rule_params.mode); + if (conflict) { + *need_more_info = true; + return false; + } + } + + *user = _user; + if (_acl_translation) { + *acl_translation_owner = _acl_translation->owner; + } + *storage_class = _storage_class; + *mode = _mode; + + return true; +} + +bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key, + const RGWObjTags::tag_map_t& tags, + rgw_sync_pipe_params *params) const +{ + if (prefix_refs.empty()) { + return false; + } + + auto iter = prefix_refs.upper_bound(key.name); + if (iter != prefix_refs.begin()) { + --iter; + } + if (iter == prefix_refs.end()) { + return false; + } + + auto end = prefix_refs.upper_bound(key.name); + auto max = end; + + std::optional<int> priority; + + for (; iter != end; ++iter) { + /* NOTE: this is not the most efficient way to do it, + * a trie data structure would be better + */ + auto& prefix = iter->first; + if (!boost::starts_with(key.name, prefix)) { + continue; + } + + auto& rule_params = iter->second->params; + auto& filter = rule_params.source.filter; + + if (!filter.check_tags(tags)) { + continue; + } + + if (rule_params.priority > priority) { + priority = rule_params.priority; + max = iter; + } + } + + if (max == end) { + return false; + } + + *params = max->second->params; + return true; +} + +/* + * return either the current prefix for s, or the next one if s is not within a prefix + */ + +RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const +{ + if (prefix_refs.empty()) { + return prefix_refs.end(); + } + auto next = prefix_refs.upper_bound(s); + auto iter = next; + if (iter != prefix_refs.begin()) { + --iter; + } + if (!boost::starts_with(s, iter->first)) { + return next; + } + + return iter; +} + +void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) { + pipe_map.insert(make_pair(pipe.id, pipe)); + + auto& rules_ref = rules[endpoints_pair(pipe)]; + + if (!rules_ref) { + rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>(); + } + + rules_ref->insert(pipe); + + pipe_handler h(rules_ref, pipe); + + handlers.insert(h); +} + +void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const +{ + encode_json("pipes", pipe_map, f); +} + +bool RGWBucketSyncFlowManager::allowed_data_flow(const rgw_zone_id& source_zone, + std::optional<rgw_bucket> source_bucket, + const rgw_zone_id& dest_zone, + std::optional<rgw_bucket> dest_bucket, + bool check_activated) const +{ + bool found = false; + bool found_activated = false; + + for (auto m : flow_groups) { + auto& fm = m.second; + auto pipes = fm.find_pipes(source_zone, source_bucket, + dest_zone, dest_bucket); + + bool is_found = !pipes.empty(); + + if (is_found) { + switch (fm.status) { + case rgw_sync_policy_group::Status::FORBIDDEN: + return false; + case rgw_sync_policy_group::Status::ENABLED: + found = true; + found_activated = true; + break; + case rgw_sync_policy_group::Status::ALLOWED: + found = true; + break; + default: + break; /* unknown -- ignore */ + } + } + } + + if (check_activated && found_activated) { + return true; + } + + return found; +} + +void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { + std::optional<rgw_sync_data_flow_group> default_flow; + if (parent) { + default_flow.emplace(); + default_flow->init_default(parent->all_zones); + } + + for (auto& item : sync_policy.groups) { + auto& group = item.second; + auto& flow_group_map = flow_groups[group.id]; + + flow_group_map.init(cct, zone_id, bucket, group, + (default_flow ? &(*default_flow) : nullptr), + &all_zones, + [&](const rgw_zone_id& source_zone, + std::optional<rgw_bucket> source_bucket, + const rgw_zone_id& dest_zone, + std::optional<rgw_bucket> dest_bucket) { + if (!parent) { + return true; + } + return parent->allowed_data_flow(source_zone, + source_bucket, + dest_zone, + dest_bucket, + false); /* just check that it's not disabled */ + }); + } +} + +void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket, + RGWBucketSyncFlowManager::pipe_set *source_pipes, + RGWBucketSyncFlowManager::pipe_set *dest_pipes, + bool only_enabled) const + +{ + string effective_bucket_key; + if (effective_bucket) { + effective_bucket_key = effective_bucket->get_key(); + } + if (parent) { + parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled); + } + + for (auto& item : flow_groups) { + auto& flow_group_map = item.second; + + /* only return enabled groups */ + if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED && + (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) { + continue; + } + + for (auto& entry : flow_group_map.sources) { + rgw_sync_bucket_pipe pipe = entry.second; + if (!pipe.dest.match_bucket(effective_bucket)) { + continue; + } + + pipe.source.apply_bucket(effective_bucket); + pipe.dest.apply_bucket(effective_bucket); + + ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding source pipe: " << pipe << dendl; + source_pipes->insert(pipe); + } + + for (auto& entry : flow_group_map.dests) { + rgw_sync_bucket_pipe pipe = entry.second; + + if (!pipe.source.match_bucket(effective_bucket)) { + continue; + } + + pipe.source.apply_bucket(effective_bucket); + pipe.dest.apply_bucket(effective_bucket); + + ldout(cct, 20) << __func__ << "(): flow manager (bucket=" << effective_bucket_key << "): adding dest pipe: " << pipe << dendl; + dest_pipes->insert(pipe); + } + } +} + + +RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(CephContext *_cct, + const rgw_zone_id& _zone_id, + std::optional<rgw_bucket> _bucket, + const RGWBucketSyncFlowManager *_parent) : cct(_cct), + zone_id(_zone_id), + bucket(_bucket), + parent(_parent) {} + + +void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, + RGWSI_SyncModules *sync_modules_svc, + rgw_sync_policy_info *ppolicy) +{ + bool found = false; + + rgw_sync_policy_info policy; + + auto& group = policy.groups["default"]; + auto& zonegroup = zone_svc->get_zonegroup(); + + for (const auto& ziter1 : zonegroup.zones) { + auto& id1 = ziter1.first; + const RGWZone& z1 = ziter1.second; + + for (const auto& ziter2 : zonegroup.zones) { + auto& id2 = ziter2.first; + const RGWZone& z2 = ziter2.second; + + if (id1 == id2) { + continue; + } + + if (z1.syncs_from(z2.name)) { + found = true; + rgw_sync_directional_rule *rule; + group.data_flow.find_or_create_directional(id2, + id1, + &rule); + } + } + } + + if (!found) { /* nothing syncs */ + return; + } + + rgw_sync_bucket_pipes pipes; + pipes.id = "all"; + pipes.source.all_zones = true; + pipes.dest.all_zones = true; + + group.pipes.emplace_back(std::move(pipes)); + + + group.status = rgw_sync_policy_group::Status::ENABLED; + + *ppolicy = std::move(policy); +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, + RGWSI_SyncModules *sync_modules_svc, + RGWSI_Bucket_Sync *_bucket_sync_svc, + std::optional<rgw_zone_id> effective_zone) : zone_svc(_zone_svc) , + bucket_sync_svc(_bucket_sync_svc) { + zone_id = effective_zone.value_or(zone_svc->zone_id()); + flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), + zone_id, + nullopt, + nullptr)); + sync_policy = zone_svc->get_zonegroup().sync_policy; + + if (sync_policy.empty()) { + RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy); + legacy_config = true; + } +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const RGWBucketInfo& _bucket_info, + map<string, bufferlist>&& _bucket_attrs) : parent(_parent), + bucket_info(_bucket_info), + bucket_attrs(std::move(_bucket_attrs)) { + if (_bucket_info.sync_policy) { + sync_policy = *_bucket_info.sync_policy; + + for (auto& entry : sync_policy.groups) { + for (auto& pipe : entry.second.pipes) { + if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER && + pipe.params.user.empty()) { + pipe.params.user = _bucket_info.owner; + } + } + } + } + legacy_config = parent->legacy_config; + bucket = _bucket_info.bucket; + zone_svc = parent->zone_svc; + bucket_sync_svc = parent->bucket_sync_svc; + flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), + parent->zone_id, + _bucket_info.bucket, + parent->flow_mgr.get())); +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const rgw_bucket& _bucket, + std::optional<rgw_sync_policy_info> _sync_policy) : parent(_parent) { + if (_sync_policy) { + sync_policy = *_sync_policy; + } + legacy_config = parent->legacy_config; + bucket = _bucket; + zone_svc = parent->zone_svc; + bucket_sync_svc = parent->bucket_sync_svc; + flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->ctx(), + parent->zone_id, + _bucket, + parent->flow_mgr.get())); +} + +RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info, + map<string, bufferlist>&& bucket_attrs) const +{ + return new RGWBucketSyncPolicyHandler(this, bucket_info, std::move(bucket_attrs)); +} + +RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket, + std::optional<rgw_sync_policy_info> sync_policy) const +{ + return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy); +} + +int RGWBucketSyncPolicyHandler::init(const DoutPrefixProvider *dpp, optional_yield y) +{ + int r = bucket_sync_svc->get_bucket_sync_hints(dpp, bucket.value_or(rgw_bucket()), + &source_hints, + &target_hints, + y); + if (r < 0) { + ldpp_dout(dpp, 0) << "ERROR: failed to initialize bucket sync policy handler: get_bucket_sync_hints() on bucket=" + << bucket << " returned r=" << r << dendl; + return r; + } + + flow_mgr->init(sync_policy); + + reflect(&source_pipes, + &target_pipes, + &sources, + &targets, + &source_zones, + &target_zones, + true); + + return 0; +} + +void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psource_pipes, + RGWBucketSyncFlowManager::pipe_set *ptarget_pipes, + map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *psources, + map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> *ptargets, + std::set<rgw_zone_id> *psource_zones, + std::set<rgw_zone_id> *ptarget_zones, + bool only_enabled) const +{ + RGWBucketSyncFlowManager::pipe_set _source_pipes; + RGWBucketSyncFlowManager::pipe_set _target_pipes; + map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _sources; + map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set> _targets; + std::set<rgw_zone_id> _source_zones; + std::set<rgw_zone_id> _target_zones; + + flow_mgr->reflect(bucket, &_source_pipes, &_target_pipes, only_enabled); + + for (auto& entry : _source_pipes.pipe_map) { + auto& pipe = entry.second; + if (!pipe.source.zone) { + continue; + } + _source_zones.insert(*pipe.source.zone); + _sources[*pipe.source.zone].insert(pipe); + } + + for (auto& entry : _target_pipes.pipe_map) { + auto& pipe = entry.second; + if (!pipe.dest.zone) { + continue; + } + _target_zones.insert(*pipe.dest.zone); + _targets[*pipe.dest.zone].insert(pipe); + } + + if (psource_pipes) { + *psource_pipes = std::move(_source_pipes); + } + if (ptarget_pipes) { + *ptarget_pipes = std::move(_target_pipes); + } + if (psources) { + *psources = std::move(_sources); + } + if (ptargets) { + *ptargets = std::move(_targets); + } + if (psource_zones) { + *psource_zones = std::move(_source_zones); + } + if (ptarget_zones) { + *ptarget_zones = std::move(_target_zones); + } +} + +multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_sources() const +{ + multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; + + for (auto& source_entry : sources) { + auto& zone_id = source_entry.first; + + auto& pipes = source_entry.second.pipe_map; + + for (auto& entry : pipes) { + auto& pipe = entry.second; + m.insert(make_pair(zone_id, pipe)); + } + } + + for (auto& pipe : resolved_sources) { + if (!pipe.source.zone) { + continue; + } + + m.insert(make_pair(*pipe.source.zone, pipe)); + } + + return m; +} + +multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests() const +{ + multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; + + for (auto& dest_entry : targets) { + auto& zone_id = dest_entry.first; + + auto& pipes = dest_entry.second.pipe_map; + + for (auto& entry : pipes) { + auto& pipe = entry.second; + m.insert(make_pair(zone_id, pipe)); + } + } + + for (auto& pipe : resolved_dests) { + if (!pipe.dest.zone) { + continue; + } + + m.insert(make_pair(*pipe.dest.zone, pipe)); + } + + return m; +} + +multimap<rgw_zone_id, rgw_sync_bucket_pipe> RGWBucketSyncPolicyHandler::get_all_dests_in_zone(const rgw_zone_id& zone_id) const +{ + multimap<rgw_zone_id, rgw_sync_bucket_pipe> m; + + auto iter = targets.find(zone_id); + if (iter != targets.end()) { + auto& pipes = iter->second.pipe_map; + + for (auto& entry : pipes) { + auto& pipe = entry.second; + m.insert(make_pair(zone_id, pipe)); + } + } + + for (auto& pipe : resolved_dests) { + if (!pipe.dest.zone || + *pipe.dest.zone != zone_id) { + continue; + } + + m.insert(make_pair(*pipe.dest.zone, pipe)); + } + + return m; +} + +void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *_sources, std::set<rgw_sync_bucket_pipe> *_targets, + std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes */ + for (auto& entry : source_pipes.pipe_map) { + auto& source_pipe = entry.second; + if (!filter_peer || + source_pipe.source.match(*filter_peer)) { + _sources->insert(source_pipe); + } + } + + for (auto& entry : target_pipes.pipe_map) { + auto& target_pipe = entry.second; + if (!filter_peer || + target_pipe.dest.match(*filter_peer)) { + _targets->insert(target_pipe); + } + } +} + +bool RGWBucketSyncPolicyHandler::bucket_exports_data() const +{ + if (!bucket) { + return false; + } + + if (bucket_is_sync_source()) { + return true; + } + + return (zone_svc->need_to_log_data() && + bucket_info->datasync_flag_enabled()); +} + +bool RGWBucketSyncPolicyHandler::bucket_imports_data() const +{ + return bucket_is_sync_target(); +} + |