summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_sync_module_pubsub.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rgw/rgw_sync_module_pubsub.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_sync_module_pubsub.cc')
-rw-r--r--src/rgw/rgw_sync_module_pubsub.cc1578
1 files changed, 1578 insertions, 0 deletions
diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc
new file mode 100644
index 00000000..fd514b81
--- /dev/null
+++ b/src/rgw/rgw_sync_module_pubsub.cc
@@ -0,0 +1,1578 @@
+#include "rgw_common.h"
+#include "rgw_coroutine.h"
+#include "rgw_sync_module.h"
+#include "rgw_data_sync.h"
+#include "rgw_sync_module_pubsub.h"
+#include "rgw_sync_module_pubsub_rest.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_cr_tools.h"
+#include "rgw_op.h"
+#include "rgw_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_notify_event_type.h"
+#include "rgw_perf_counters.h"
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+#include "rgw_amqp.h"
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+#include "rgw_kafka.h"
+#endif
+
+#include <boost/algorithm/hex.hpp>
+#include <boost/asio/yield.hpp>
+
+#define dout_subsys ceph_subsys_rgw
+
+
+#define PUBSUB_EVENTS_RETENTION_DEFAULT 7
+
+/*
+
+config:
+
+{
+ "tenant": <tenant>, # default: <empty>
+ "uid": <uid>, # default: "pubsub"
+ "data_bucket_prefix": <prefix> # default: "pubsub-"
+ "data_oid_prefix": <prefix> #
+ "events_retention_days": <int> # default: 7
+ "start_with_full_sync" <bool> # default: false
+
+ # non-dynamic config
+ "notifications": [
+ {
+ "path": <notification-path>, # this can be either an explicit path: <bucket>, or <bucket>/<object>,
+ # or a prefix if it ends with a wildcard
+ "topic": <topic-name>
+ },
+ ...
+ ],
+ "subscriptions": [
+ {
+ "name": <subscription-name>,
+ "topic": <topic>,
+ "push_endpoint": <endpoint>,
+ "push_endpoint_args:" <arg list>. # any push endpoint specific args (include all args)
+ "data_bucket": <bucket>, # override name of bucket where subscription data will be store
+ "data_oid_prefix": <prefix> # set prefix for subscription data object ids
+ "s3_id": <id> # in case of S3 compatible notifications, the notification ID will be set here
+ },
+ ...
+ ]
+}
+
+*/
+
+// utility function to convert the args list from string format
+// (ampresend separated with equal sign) to prased structure
+RGWHTTPArgs string_to_args(const std::string& str_args) {
+ RGWHTTPArgs args;
+ args.set(str_args);
+ args.parse();
+ return args;
+}
+
+struct PSSubConfig {
+ std::string name;
+ std::string topic;
+ std::string push_endpoint_name;
+ std::string push_endpoint_args;
+ std::string data_bucket_name;
+ std::string data_oid_prefix;
+ std::string s3_id;
+ std::string arn_topic;
+ RGWPubSubEndpoint::Ptr push_endpoint;
+
+ void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc) {
+ name = uc.name;
+ topic = uc.topic;
+ push_endpoint_name = uc.dest.push_endpoint;
+ data_bucket_name = uc.dest.bucket_name;
+ data_oid_prefix = uc.dest.oid_prefix;
+ s3_id = uc.s3_id;
+ arn_topic = uc.dest.arn_topic;
+ if (!push_endpoint_name.empty()) {
+ push_endpoint_args = uc.dest.push_endpoint_args;
+ try {
+ push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
+ ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
+ } catch (const RGWPubSubEndpoint::configuration_error& e) {
+ ldout(cct, 1) << "ERROR: failed to create push endpoint: "
+ << push_endpoint_name << " due to: " << e.what() << dendl;
+ }
+ }
+ }
+
+ void dump(Formatter *f) const {
+ encode_json("name", name, f);
+ encode_json("topic", topic, f);
+ encode_json("push_endpoint", push_endpoint_name, f);
+ encode_json("push_endpoint_args", push_endpoint_args, f);
+ encode_json("data_bucket_name", data_bucket_name, f);
+ encode_json("data_oid_prefix", data_oid_prefix, f);
+ encode_json("s3_id", s3_id, f);
+ }
+
+ void init(CephContext *cct, const JSONFormattable& config,
+ const string& data_bucket_prefix,
+ const string& default_oid_prefix) {
+ name = config["name"];
+ topic = config["topic"];
+ push_endpoint_name = config["push_endpoint"];
+ string default_bucket_name = data_bucket_prefix + name;
+ data_bucket_name = config["data_bucket"](default_bucket_name.c_str());
+ data_oid_prefix = config["data_oid_prefix"](default_oid_prefix.c_str());
+ s3_id = config["s3_id"];
+ arn_topic = config["arn_topic"];
+ if (!push_endpoint_name.empty()) {
+ push_endpoint_args = config["push_endpoint_args"];
+ try {
+ push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args), cct);
+ ldout(cct, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl;
+ } catch (const RGWPubSubEndpoint::configuration_error& e) {
+ ldout(cct, 1) << "ERROR: failed to create push endpoint: "
+ << push_endpoint_name << " due to: " << e.what() << dendl;
+ }
+ }
+ }
+};
+
+using PSSubConfigRef = std::shared_ptr<PSSubConfig>;
+
+struct PSTopicConfig {
+ std::string name;
+ std::set<std::string> subs;
+ std::string opaque_data;
+
+ void dump(Formatter *f) const {
+ encode_json("name", name, f);
+ encode_json("subs", subs, f);
+ encode_json("opaque", opaque_data, f);
+ }
+};
+
+struct PSNotificationConfig {
+ uint64_t id{0};
+ string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */
+ string topic;
+ bool is_prefix{false};
+
+
+ void dump(Formatter *f) const {
+ encode_json("id", id, f);
+ encode_json("path", path, f);
+ encode_json("topic", topic, f);
+ encode_json("is_prefix", is_prefix, f);
+ }
+
+ void init(CephContext *cct, const JSONFormattable& config) {
+ path = config["path"];
+ if (!path.empty() && path[path.size() - 1] == '*') {
+ path = path.substr(0, path.size() - 1);
+ is_prefix = true;
+ }
+ topic = config["topic"];
+ }
+};
+
+template<class T>
+static string json_str(const char *name, const T& obj, bool pretty = false)
+{
+ stringstream ss;
+ JSONFormatter f(pretty);
+
+ encode_json(name, obj, &f);
+ f.flush(ss);
+
+ return ss.str();
+}
+
+using PSTopicConfigRef = std::shared_ptr<PSTopicConfig>;
+using TopicsRef = std::shared_ptr<std::vector<PSTopicConfigRef>>;
+
+struct PSConfig {
+ const std::string id{"pubsub"};
+ rgw_user user;
+ std::string data_bucket_prefix;
+ std::string data_oid_prefix;
+
+ int events_retention_days{0};
+
+ uint64_t sync_instance{0};
+ uint64_t max_id{0};
+
+ /* FIXME: no hard coded buckets, we'll have configurable topics */
+ std::map<std::string, PSSubConfigRef> subs;
+ std::map<std::string, PSTopicConfigRef> topics;
+ std::multimap<std::string, PSNotificationConfig> notifications;
+
+ bool start_with_full_sync{false};
+
+ void dump(Formatter *f) const {
+ encode_json("id", id, f);
+ encode_json("user", user, f);
+ encode_json("data_bucket_prefix", data_bucket_prefix, f);
+ encode_json("data_oid_prefix", data_oid_prefix, f);
+ encode_json("events_retention_days", events_retention_days, f);
+ encode_json("sync_instance", sync_instance, f);
+ encode_json("max_id", max_id, f);
+ {
+ Formatter::ArraySection section(*f, "subs");
+ for (auto& sub : subs) {
+ encode_json("sub", *sub.second, f);
+ }
+ }
+ {
+ Formatter::ArraySection section(*f, "topics");
+ for (auto& topic : topics) {
+ encode_json("topic", *topic.second, f);
+ }
+ }
+ {
+ Formatter::ObjectSection section(*f, "notifications");
+ std::string last;
+ for (auto& notif : notifications) {
+ const string& n = notif.first;
+ if (n != last) {
+ if (!last.empty()) {
+ f->close_section();
+ }
+ f->open_array_section(n.c_str());
+ }
+ last = n;
+ encode_json("notifications", notif.second, f);
+ }
+ if (!last.empty()) {
+ f->close_section();
+ }
+ }
+ encode_json("start_with_full_sync", start_with_full_sync, f);
+ }
+
+ void init(CephContext *cct, const JSONFormattable& config) {
+ string uid = config["uid"]("pubsub");
+ user = rgw_user(config["tenant"], uid);
+ data_bucket_prefix = config["data_bucket_prefix"]("pubsub-");
+ data_oid_prefix = config["data_oid_prefix"];
+ events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT);
+
+ for (auto& c : config["notifications"].array()) {
+ PSNotificationConfig nc;
+ nc.id = ++max_id;
+ nc.init(cct, c);
+ notifications.insert(std::make_pair(nc.path, nc));
+
+ PSTopicConfig topic_config = { .name = nc.topic };
+ topics[nc.topic] = make_shared<PSTopicConfig>(topic_config);
+ }
+ for (auto& c : config["subscriptions"].array()) {
+ auto sc = std::make_shared<PSSubConfig>();
+ sc->init(cct, c, data_bucket_prefix, data_oid_prefix);
+ subs[sc->name] = sc;
+ auto iter = topics.find(sc->topic);
+ if (iter != topics.end()) {
+ iter->second->subs.insert(sc->name);
+ }
+ }
+ start_with_full_sync = config["start_with_full_sync"](false);
+
+ ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
+ }
+
+ void init_instance(const RGWRealm& realm, uint64_t instance_id) {
+ sync_instance = instance_id;
+ }
+
+ void get_topics(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, TopicsRef *result) {
+ const std::string path = bucket.name + "/" + key.name;
+
+ auto iter = notifications.upper_bound(path);
+ if (iter == notifications.begin()) {
+ return;
+ }
+
+ do {
+ --iter;
+ if (iter->first.size() > path.size()) {
+ break;
+ }
+ if (path.compare(0, iter->first.size(), iter->first) != 0) {
+ break;
+ }
+
+ PSNotificationConfig& target = iter->second;
+
+ if (!target.is_prefix &&
+ path.size() != iter->first.size()) {
+ continue;
+ }
+
+ auto topic = topics.find(target.topic);
+ if (topic == topics.end()) {
+ continue;
+ }
+
+ ldout(cct, 20) << ": found topic for path=" << bucket << "/" << key << ": id=" << target.id <<
+ " target_path=" << target.path << ", topic=" << target.topic << dendl;
+ (*result)->push_back(topic->second);
+ } while (iter != notifications.begin());
+ }
+
+ bool find_sub(const string& name, PSSubConfigRef *ref) {
+ auto iter = subs.find(name);
+ if (iter != subs.end()) {
+ *ref = iter->second;
+ return true;
+ }
+ return false;
+ }
+};
+
+using PSConfigRef = std::shared_ptr<PSConfig>;
+template<typename EventType>
+using EventRef = std::shared_ptr<EventType>;
+
+struct objstore_event {
+ string id;
+ const rgw_bucket& bucket;
+ const rgw_obj_key& key;
+ const ceph::real_time& mtime;
+ const std::vector<std::pair<std::string, std::string> > *attrs;
+
+ objstore_event(const rgw_bucket& _bucket,
+ const rgw_obj_key& _key,
+ const ceph::real_time& _mtime,
+ const std::vector<std::pair<std::string, std::string> > *_attrs) : bucket(_bucket),
+ key(_key),
+ mtime(_mtime),
+ attrs(_attrs) {}
+
+ string get_hash() {
+ string etag;
+ RGWMD5Etag hash;
+ hash.update(bucket.bucket_id);
+ hash.update(key.name);
+ hash.update(key.instance);
+ hash.finish(&etag);
+
+ assert(etag.size() > 8);
+
+ return etag.substr(0, 8);
+ }
+
+ void dump(Formatter *f) const {
+ {
+ Formatter::ObjectSection s(*f, "bucket");
+ encode_json("name", bucket.name, f);
+ encode_json("tenant", bucket.tenant, f);
+ encode_json("bucket_id", bucket.bucket_id, f);
+ }
+ {
+ Formatter::ObjectSection s(*f, "key");
+ encode_json("name", key.name, f);
+ encode_json("instance", key.instance, f);
+ }
+ utime_t mt(mtime);
+ encode_json("mtime", mt, f);
+ Formatter::ObjectSection s(*f, "attrs");
+ if (attrs) {
+ for (auto& attr : *attrs) {
+ encode_json(attr.first.c_str(), attr.second.c_str(), f);
+ }
+ }
+ }
+};
+
+static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
+ const rgw_obj_key& key,
+ const ceph::real_time& mtime,
+ const std::vector<std::pair<std::string, std::string> > *attrs,
+ rgw::notify::EventType event_type,
+ EventRef<rgw_pubsub_event> *event) {
+ *event = std::make_shared<rgw_pubsub_event>();
+
+ EventRef<rgw_pubsub_event>& e = *event;
+ e->event_name = rgw::notify::to_ceph_string(event_type);
+ e->source = bucket.name + "/" + key.name;
+ e->timestamp = real_clock::now();
+
+ objstore_event oevent(bucket, key, mtime, attrs);
+
+ const utime_t ts(e->timestamp);
+ set_event_id(e->id, oevent.get_hash(), ts);
+
+ encode_json("info", oevent, &e->info);
+}
+
+static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+ const rgw_user& owner,
+ const rgw_obj_key& key,
+ const ceph::real_time& mtime,
+ const std::vector<std::pair<std::string, std::string> > *attrs,
+ rgw::notify::EventType event_type,
+ EventRef<rgw_pubsub_s3_record> *record) {
+ *record = std::make_shared<rgw_pubsub_s3_record>();
+
+ EventRef<rgw_pubsub_s3_record>& r = *record;
+ r->eventTime = mtime;
+ r->eventName = rgw::notify::to_string(event_type);
+ // userIdentity: not supported in sync module
+ // x_amz_request_id: not supported in sync module
+ // x_amz_id_2: not supported in sync module
+ // configurationId is filled from subscription configuration
+ r->bucket_name = bucket.name;
+ r->bucket_ownerIdentity = owner.to_str();
+ r->bucket_arn = to_string(rgw::ARN(bucket));
+ r->bucket_id = bucket.bucket_id; // rgw extension
+ r->object_key = key.name;
+ // object_size not supported in sync module
+ objstore_event oevent(bucket, key, mtime, attrs);
+ r->object_etag = oevent.get_hash();
+ r->object_versionId = key.instance;
+
+ // use timestamp as per key sequence id (hex encoded)
+ const utime_t ts(real_clock::now());
+ boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
+ std::back_inserter(r->object_sequencer));
+
+ set_event_id(r->id, r->object_etag, ts);
+}
+
+class PSManager;
+using PSManagerRef = std::shared_ptr<PSManager>;
+
+struct PSEnv {
+ PSConfigRef conf;
+ shared_ptr<RGWUserInfo> data_user_info;
+ PSManagerRef manager;
+
+ PSEnv() : conf(make_shared<PSConfig>()),
+ data_user_info(make_shared<RGWUserInfo>()) {}
+
+ void init(CephContext *cct, const JSONFormattable& config) {
+ conf->init(cct, config);
+ }
+
+ void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr);
+};
+
+using PSEnvRef = std::shared_ptr<PSEnv>;
+
+template<typename EventType>
+class PSEvent {
+ const EventRef<EventType> event;
+
+public:
+ PSEvent(const EventRef<EventType>& _event) : event(_event) {}
+
+ void format(bufferlist *bl) const {
+ bl->append(json_str("", *event));
+ }
+
+ void encode_event(bufferlist& bl) const {
+ encode(*event, bl);
+ }
+
+ const string& id() const {
+ return event->id;
+ }
+};
+
+template <class T>
+class RGWSingletonCR : public RGWCoroutine {
+ friend class WrapperCR;
+
+ boost::asio::coroutine wrapper_state;
+ bool started{false};
+ int operate_ret{0};
+
+ struct WaiterInfo {
+ RGWCoroutine *cr{nullptr};
+ T *result;
+ };
+ using WaiterInfoRef = std::shared_ptr<WaiterInfo>;
+
+ deque<WaiterInfoRef> waiters;
+
+ void add_waiter(RGWCoroutine *cr, T *result) {
+ auto waiter = std::make_shared<WaiterInfo>();
+ waiter->cr = cr;
+ waiter->result = result;
+ waiters.push_back(waiter);
+ };
+
+ bool get_next_waiter(WaiterInfoRef *waiter) {
+ if (waiters.empty()) {
+ waiter->reset();
+ return false;
+ }
+
+ *waiter = waiters.front();
+ waiters.pop_front();
+ return true;
+ }
+
+ int operate_wrapper() override {
+ reenter(&wrapper_state) {
+ while (!is_done()) {
+ ldout(cct, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl;
+ operate_ret = operate();
+ if (operate_ret < 0) {
+ ldout(cct, 20) << *this << ": operate() returned r=" << operate_ret << dendl;
+ }
+ if (!is_done()) {
+ yield;
+ }
+ }
+
+ ldout(cct, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl;
+ /* we're done, can't yield anymore */
+
+ WaiterInfoRef waiter;
+ while (get_next_waiter(&waiter)) {
+ ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl;
+ waiter->cr->set_retcode(retcode);
+ waiter->cr->set_sleeping(false);
+ return_result(waiter->result);
+ put();
+ }
+
+ return retcode;
+ }
+ return 0;
+ }
+
+ virtual void return_result(T *result) {}
+
+public:
+ RGWSingletonCR(CephContext *_cct)
+ : RGWCoroutine(_cct) {}
+
+ int execute(RGWCoroutine *caller, T *result = nullptr) {
+ if (!started) {
+ ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl;
+ started = true;
+ caller->call(this);
+ return 0;
+ } else if (!is_done()) {
+ ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl;
+ get();
+ add_waiter(caller, result);
+ caller->set_sleeping(true);
+ return 0;
+ }
+
+ ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl;
+ caller->set_retcode(retcode);
+ return_result(result);
+ return retcode;
+ }
+};
+
+
+class PSSubscription;
+using PSSubscriptionRef = std::shared_ptr<PSSubscription>;
+
+class PSSubscription {
+ class InitCR;
+ friend class InitCR;
+ friend class RGWPSHandleObjEventCR;
+
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ PSSubConfigRef sub_conf;
+ std::shared_ptr<rgw_get_bucket_info_result> get_bucket_info_result;
+ RGWBucketInfo *bucket_info{nullptr};
+ RGWDataAccessRef data_access;
+ RGWDataAccess::BucketRef bucket;
+
+ InitCR *init_cr{nullptr};
+
+ class InitBucketLifecycleCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSConfigRef& conf;
+ LCRule rule;
+
+ int retention_days;
+
+ rgw_bucket_lifecycle_config_params lc_config;
+
+ public:
+ InitBucketLifecycleCR(RGWDataSyncEnv *_sync_env,
+ PSConfigRef& _conf,
+ RGWBucketInfo& _bucket_info,
+ std::map<string, bufferlist>& _bucket_attrs) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ conf(_conf) {
+ lc_config.bucket_info = _bucket_info;
+ lc_config.bucket_attrs = _bucket_attrs;
+ retention_days = conf->events_retention_days;
+ }
+
+ int operate() override {
+ reenter(this) {
+
+ rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days);
+
+ {
+ /* maybe we already have it configured? */
+ RGWLifecycleConfiguration old_config;
+ auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC);
+ if (aiter != lc_config.bucket_attrs.end()) {
+ bufferlist::const_iterator iter{&aiter->second};
+ try {
+ old_config.decode(iter);
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << __func__ << "(): decode life cycle config failed" << dendl;
+ }
+ }
+
+ auto old_rules = old_config.get_rule_map();
+ for (auto ori : old_rules) {
+ auto& old_rule = ori.second;
+
+ if (old_rule.get_prefix().empty() &&
+ old_rule.get_expiration().get_days() == retention_days &&
+ old_rule.is_enabled()) {
+ ldout(sync_env->cct, 20) << "no need to set lifecycle rule on bucketi, existing rule matches config" << dendl;
+ return set_cr_done();
+ }
+ }
+ }
+
+ lc_config.config.add_rule(rule);
+ yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados,
+ sync_env->store,
+ lc_config));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
+
+ class InitCR : public RGWSingletonCR<bool> {
+ RGWDataSyncEnv *sync_env;
+ PSSubscriptionRef sub;
+ rgw_get_bucket_info_params get_bucket_info;
+ rgw_bucket_create_local_params create_bucket;
+ PSConfigRef& conf;
+ PSSubConfigRef& sub_conf;
+ int i;
+
+ public:
+ InitCR(RGWDataSyncEnv *_sync_env,
+ PSSubscriptionRef& _sub) : RGWSingletonCR<bool>(_sync_env->cct),
+ sync_env(_sync_env),
+ sub(_sub), conf(sub->env->conf),
+ sub_conf(sub->sub_conf) {
+ }
+
+ int operate() override {
+ reenter(this) {
+ get_bucket_info.tenant = conf->user.tenant;
+ get_bucket_info.bucket_name = sub_conf->data_bucket_name;
+ sub->get_bucket_info_result = make_shared<rgw_get_bucket_info_result>();
+
+ for (i = 0; i < 2; ++i) {
+ yield call(new RGWGetBucketInfoCR(sync_env->async_rados,
+ sync_env->store,
+ get_bucket_info,
+ sub->get_bucket_info_result));
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to geting bucket info: " << "tenant="
+ << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+ }
+ if (retcode == 0) {
+ {
+ auto& result = sub->get_bucket_info_result;
+ sub->bucket_info = &result->bucket_info;
+
+ int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl;
+ return set_cr_error(ret);
+ }
+ }
+
+ yield call(new InitBucketLifecycleCR(sync_env, conf,
+ sub->get_bucket_info_result->bucket_info,
+ sub->get_bucket_info_result->attrs));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+
+ create_bucket.user_info = sub->env->data_user_info;
+ create_bucket.bucket_name = sub_conf->data_bucket_name;
+ ldout(sync_env->cct, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl;
+ yield call(new RGWBucketCreateLocalCR(sync_env->async_rados,
+ sync_env->store,
+ create_bucket));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to create bucket: " << "tenant="
+ << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ /* second iteration: we got -ENOENT and created a bucket */
+ }
+
+ /* failed twice on -ENOENT, unexpected */
+ ldout(sync_env->cct, 0) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant
+ << " name=" << get_bucket_info.bucket_name << dendl;
+ return set_cr_error(-EIO);
+ }
+ return 0;
+ }
+ };
+
+ template<typename EventType>
+ class StoreEventCR : public RGWCoroutine {
+ RGWDataSyncEnv* const sync_env;
+ const PSSubscriptionRef sub;
+ const PSEvent<EventType> pse;
+ const string oid_prefix;
+
+ public:
+ StoreEventCR(RGWDataSyncEnv* const _sync_env,
+ const PSSubscriptionRef& _sub,
+ const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ sub(_sub),
+ pse(_event),
+ oid_prefix(sub->sub_conf->data_oid_prefix) {
+ }
+
+ int operate() override {
+ rgw_object_simple_put_params put_obj;
+ reenter(this) {
+
+ put_obj.bucket = sub->bucket;
+ put_obj.key = rgw_obj_key(oid_prefix + pse.id());
+
+ pse.format(&put_obj.data);
+
+ {
+ bufferlist bl;
+ pse.encode_event(bl);
+ bufferlist bl64;
+ bl.encode_base64(bl64);
+ put_obj.user_data = bl64.to_str();
+ }
+
+ yield call(new RGWObjectSimplePutCR(sync_env->async_rados,
+ sync_env->store,
+ put_obj));
+ if (retcode < 0) {
+ ldpp_dout(sync_env->dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ } else {
+ ldpp_dout(sync_env->dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl;
+ }
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
+
+ template<typename EventType>
+ class PushEventCR : public RGWCoroutine {
+ RGWDataSyncEnv* const sync_env;
+ const EventRef<EventType> event;
+ const PSSubConfigRef& sub_conf;
+
+ public:
+ PushEventCR(RGWDataSyncEnv* const _sync_env,
+ const PSSubscriptionRef& _sub,
+ const EventRef<EventType>& _event) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ event(_event),
+ sub_conf(_sub->sub_conf) {
+ }
+
+ int operate() override {
+ reenter(this) {
+ ceph_assert(sub_conf->push_endpoint);
+ yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env));
+
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "failed to push event: " << event->id <<
+ " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ ldout(sync_env->cct, 20) << "event: " << event->id <<
+ " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+ };
+
+public:
+ PSSubscription(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ PSSubConfigRef& _sub_conf) : sync_env(_sync_env),
+ env(_env),
+ sub_conf(_sub_conf),
+ data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {}
+
+ PSSubscription(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ rgw_pubsub_sub_config& user_sub_conf) : sync_env(_sync_env),
+ env(_env),
+ sub_conf(std::make_shared<PSSubConfig>()),
+ data_access(std::make_shared<RGWDataAccess>(sync_env->store)) {
+ sub_conf->from_user_conf(sync_env->cct, user_sub_conf);
+ }
+ virtual ~PSSubscription() {
+ if (init_cr) {
+ init_cr->put();
+ }
+ }
+
+ template <class C>
+ static PSSubscriptionRef get_shared(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ C& _sub_conf) {
+ auto sub = std::make_shared<PSSubscription>(_sync_env, _env, _sub_conf);
+ sub->init_cr = new InitCR(_sync_env, sub);
+ sub->init_cr->get();
+ return sub;
+ }
+
+ int call_init_cr(RGWCoroutine *caller) {
+ return init_cr->execute(caller);
+ }
+
+ template<typename EventType>
+ static RGWCoroutine *store_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
+ return new StoreEventCR<EventType>(sync_env, sub, event);
+ }
+
+ template<typename EventType>
+ static RGWCoroutine *push_event_cr(RGWDataSyncEnv* const sync_env, const PSSubscriptionRef& sub, const EventRef<EventType>& event) {
+ return new PushEventCR<EventType>(sync_env, sub, event);
+ }
+ friend class InitCR;
+};
+
+class PSManager
+{
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+
+ std::map<string, PSSubscriptionRef> subs;
+
+ class GetSubCR : public RGWSingletonCR<PSSubscriptionRef> {
+ RGWDataSyncEnv *sync_env;
+ PSManagerRef mgr;
+ rgw_user owner;
+ string sub_name;
+ string sub_id;
+ PSSubscriptionRef *ref;
+
+ PSConfigRef conf;
+
+ PSSubConfigRef sub_conf;
+ rgw_pubsub_sub_config user_sub_conf;
+
+ public:
+ GetSubCR(RGWDataSyncEnv *_sync_env,
+ PSManagerRef& _mgr,
+ const rgw_user& _owner,
+ const string& _sub_name,
+ PSSubscriptionRef *_ref) : RGWSingletonCR<PSSubscriptionRef>(_sync_env->cct),
+ sync_env(_sync_env),
+ mgr(_mgr),
+ owner(_owner),
+ sub_name(_sub_name),
+ ref(_ref),
+ conf(mgr->env->conf) {
+ }
+ ~GetSubCR() { }
+
+ int operate() override {
+ reenter(this) {
+ if (owner.empty()) {
+ if (!conf->find_sub(sub_name, &sub_conf)) {
+ ldout(sync_env->cct, 10) << "failed to find subscription config: name=" << sub_name << dendl;
+ mgr->remove_get_sub(owner, sub_name);
+ return set_cr_error(-ENOENT);
+ }
+
+ *ref = PSSubscription::get_shared(sync_env, mgr->env, sub_conf);
+ } else {
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_sub_config>;
+ yield {
+ RGWUserPubSub ups(sync_env->store, owner);
+ rgw_raw_obj obj;
+ ups.get_sub_meta_obj(sub_name, &obj);
+ bool empty_on_enoent = false;
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+ obj,
+ &user_sub_conf, empty_on_enoent));
+ }
+ if (retcode < 0) {
+ mgr->remove_get_sub(owner, sub_name);
+ return set_cr_error(retcode);
+ }
+
+ *ref = PSSubscription::get_shared(sync_env, mgr->env, user_sub_conf);
+ }
+
+ yield (*ref)->call_init_cr(this);
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << "failed to init subscription" << dendl;
+ mgr->remove_get_sub(owner, sub_name);
+ return set_cr_error(retcode);
+ }
+
+ if (owner.empty()) {
+ mgr->subs[sub_name] = *ref;
+ }
+ mgr->remove_get_sub(owner, sub_name);
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+
+ void return_result(PSSubscriptionRef *result) override {
+ ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl;
+ if (retcode >= 0) {
+ *result = *ref;
+ }
+ }
+ };
+
+ string sub_id(const rgw_user& owner, const string& sub_name) {
+ string owner_prefix;
+ if (!owner.empty()) {
+ owner_prefix = owner.to_str() + "/";
+ }
+
+ return owner_prefix + sub_name;
+ }
+
+ std::map<std::string, GetSubCR *> get_subs;
+
+ GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) {
+ return get_subs[sub_id(owner, name)];
+ }
+
+ void remove_get_sub(const rgw_user& owner, const string& name) {
+ get_subs.erase(sub_id(owner, name));
+ }
+
+ bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) {
+ auto iter = subs.find(sub_id(owner, sub_name));
+ if (iter != subs.end()) {
+ *sub = iter->second;
+ return true;
+ }
+ return false;
+ }
+
+ PSManager(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env) : sync_env(_sync_env),
+ env(_env) {}
+
+public:
+ static PSManagerRef get_shared(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env) {
+ return std::shared_ptr<PSManager>(new PSManager(_sync_env, _env));
+ }
+
+ static int call_get_subscription_cr(RGWDataSyncEnv *sync_env, PSManagerRef& mgr,
+ RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) {
+ if (mgr->find_sub_instance(owner, sub_name, ref)) {
+ /* found it! nothing to execute */
+ ldout(sync_env->cct, 20) << __func__ << "(): found sub instance" << dendl;
+ }
+ auto& gs = mgr->get_get_subs(owner, sub_name);
+ if (!gs) {
+ ldout(sync_env->cct, 20) << __func__ << "(): first get subs" << dendl;
+ gs = new GetSubCR(sync_env, mgr, owner, sub_name, ref);
+ }
+ ldout(sync_env->cct, 20) << __func__ << "(): executing get subs" << dendl;
+ return gs->execute(caller, ref);
+ }
+
+ friend class GetSubCR;
+};
+
+void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) {
+ manager = mgr;
+ conf->init_instance(realm, instance_id);
+}
+
+class RGWPSInitEnvCBCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ PSConfigRef& conf;
+
+ rgw_user_create_params create_user;
+ rgw_get_user_info_params get_user_info;
+public:
+ RGWPSInitEnvCBCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef& _env) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env), conf(env->conf) {}
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 0) << ": init pubsub config zone=" << sync_env->source_zone << dendl;
+
+ /* nothing to do here right now */
+ create_user.user = conf->user;
+ create_user.max_buckets = 0; /* unlimited */
+ create_user.display_name = "pubsub";
+ create_user.generate_key = false;
+ yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user));
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ get_user_info.user = conf->user;
+ yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info));
+ if (retcode < 0) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to create rgw user: ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+
+ ldout(sync_env->cct, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl;
+
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) {
+ if (!match(filter.events, event_type)) {
+ return false;
+ }
+ if (!match(filter.s3_filter.key_filter, key_name)) {
+ return false;
+ }
+ return true;
+}
+
+class RGWPSFindBucketTopicsCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ rgw_user owner;
+ rgw_bucket bucket;
+ rgw_obj_key key;
+ rgw::notify::EventType event_type;
+
+ RGWUserPubSub ups;
+
+ rgw_raw_obj bucket_obj;
+ rgw_raw_obj user_obj;
+ rgw_pubsub_bucket_topics bucket_topics;
+ rgw_pubsub_user_topics user_topics;
+ TopicsRef *topics;
+public:
+ RGWPSFindBucketTopicsCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef& _env,
+ const rgw_user& _owner,
+ const rgw_bucket& _bucket,
+ const rgw_obj_key& _key,
+ rgw::notify::EventType _event_type,
+ TopicsRef *_topics) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ owner(_owner),
+ bucket(_bucket),
+ key(_key),
+ event_type(_event_type),
+ ups(_sync_env->store, owner),
+ topics(_topics) {
+ *topics = std::make_shared<vector<PSTopicConfigRef> >();
+ }
+ int operate() override {
+ reenter(this) {
+ ups.get_bucket_meta_obj(bucket, &bucket_obj);
+ ups.get_user_meta_obj(&user_obj);
+
+ using ReadInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_bucket_topics>;
+ yield {
+ bool empty_on_enoent = true;
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+ bucket_obj,
+ &bucket_topics, empty_on_enoent));
+ }
+ if (retcode < 0 && retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+
+ ldout(sync_env->cct, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl;
+
+ if (!bucket_topics.topics.empty()) {
+ using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR<rgw_pubsub_user_topics>;
+ yield {
+ bool empty_on_enoent = true;
+ call(new ReadUserTopicsInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
+ user_obj,
+ &user_topics, empty_on_enoent));
+ }
+ if (retcode < 0 && retcode != -ENOENT) {
+ return set_cr_error(retcode);
+ }
+ }
+
+ for (auto& titer : bucket_topics.topics) {
+ auto& topic_filter = titer.second;
+ auto& info = topic_filter.topic;
+ if (!match(topic_filter, key.name, event_type)) {
+ continue;
+ }
+ std::shared_ptr<PSTopicConfig> tc = std::make_shared<PSTopicConfig>();
+ tc->name = info.name;
+ tc->subs = user_topics.topics[info.name].subs;
+ tc->opaque_data = info.opaque_data;
+ (*topics)->push_back(tc);
+ }
+
+ env->conf->get_topics(sync_env->cct, bucket, key, topics);
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+class RGWPSHandleObjEventCR : public RGWCoroutine {
+ RGWDataSyncEnv* const sync_env;
+ const PSEnvRef env;
+ const rgw_user& owner;
+ const EventRef<rgw_pubsub_event> event;
+ const EventRef<rgw_pubsub_s3_record> record;
+ const TopicsRef topics;
+ const std::array<rgw_user, 2> owners;
+ bool has_subscriptions;
+ bool event_handled;
+ bool sub_conf_found;
+ PSSubscriptionRef sub;
+ std::array<rgw_user, 2>::const_iterator oiter;
+ std::vector<PSTopicConfigRef>::const_iterator titer;
+ std::set<std::string>::const_iterator siter;
+ int last_sub_conf_error;
+
+public:
+ RGWPSHandleObjEventCR(RGWDataSyncEnv* const _sync_env,
+ const PSEnvRef _env,
+ const rgw_user& _owner,
+ const EventRef<rgw_pubsub_event>& _event,
+ const EventRef<rgw_pubsub_s3_record>& _record,
+ const TopicsRef& _topics) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ owner(_owner),
+ event(_event),
+ record(_record),
+ topics(_topics),
+ owners({owner, rgw_user{}}),
+ has_subscriptions(false),
+ event_handled(false) {}
+
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 20) << ": handle event: obj: z=" << sync_env->source_zone
+ << " event=" << json_str("event", *event, false)
+ << " owner=" << owner << dendl;
+
+ ldout(sync_env->cct, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl;
+
+ // outside caller should check that
+ ceph_assert(!topics->empty());
+
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
+
+ // loop over all topics related to the bucket/object
+ for (titer = topics->begin(); titer != topics->end(); ++titer) {
+ ldout(sync_env->cct, 20) << ": notification for " << event->source << ": topic=" <<
+ (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl;
+ // loop over all subscriptions of the topic
+ for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) {
+ ldout(sync_env->cct, 20) << ": subscription: " << *siter << dendl;
+ has_subscriptions = true;
+ sub_conf_found = false;
+ // try to read subscription configuration from global/user cond
+ // configuration is considered missing only if does not exist in either
+ for (oiter = owners.begin(); oiter != owners.end(); ++oiter) {
+ yield PSManager::call_get_subscription_cr(sync_env, env->manager, this, *oiter, *siter, &sub);
+ if (retcode < 0) {
+ if (sub_conf_found) {
+ // not a real issue, sub conf already found
+ retcode = 0;
+ }
+ last_sub_conf_error = retcode;
+ continue;
+ }
+ sub_conf_found = true;
+ if (sub->sub_conf->s3_id.empty()) {
+ // subscription was not made by S3 compatible API
+ ldout(sync_env->cct, 20) << "storing event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::store_event_cr(sync_env, sub, event));
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+ ldout(sync_env->cct, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ event_handled = true;
+ }
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sync_env->cct, 20) << "push event for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sync_env, sub, event));
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ ldout(sync_env->cct, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
+ event_handled = true;
+ }
+ }
+ } else {
+ // subscription was made by S3 compatible API
+ ldout(sync_env->cct, 20) << "storing record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ record->configurationId = sub->sub_conf->s3_id;
+ record->opaque_data = (*titer)->opaque_data;
+ yield call(PSSubscription::store_event_cr(sync_env, sub, record));
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
+ ldout(sync_env->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
+ event_handled = true;
+ }
+ if (sub->sub_conf->push_endpoint) {
+ ldout(sync_env->cct, 20) << "push record for subscription=" << *siter << " owner=" << *oiter << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sync_env, sub, record));
+ if (retcode < 0) {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
+ ldout(sync_env->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
+ } else {
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
+ event_handled = true;
+ }
+ }
+ }
+ }
+ if (!sub_conf_found) {
+ // could not find conf for subscription at user or global levels
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf);
+ ldout(sync_env->cct, 1) << "ERROR: failed to find subscription config for subscription=" << *siter
+ << " ret=" << last_sub_conf_error << dendl;
+ if (retcode == -ENOENT) {
+ // missing subscription info should be reflected back as invalid argument
+ // and not as missing object
+ retcode = -EINVAL;
+ }
+ }
+ }
+ }
+ if (has_subscriptions && !event_handled) {
+ // event is considered "lost" of it has subscriptions on any of its topics
+ // but it was not stored in, or pushed to, any of them
+ if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+// coroutine invoked on remote object creation
+class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ std::optional<uint64_t> versioned_epoch;
+ EventRef<rgw_pubsub_event> event;
+ EventRef<rgw_pubsub_s3_record> record;
+ TopicsRef topics;
+public:
+ RGWPSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
+ TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ sync_env(_sync_env),
+ env(_env),
+ versioned_epoch(_versioned_epoch),
+ topics(_topics) {
+ }
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 20) << ": stat of remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
+ << " attrs=" << attrs << dendl;
+ {
+ std::vector<std::pair<std::string, std::string> > attrs;
+ for (auto& attr : attrs) {
+ std::string k = attr.first;
+ if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) {
+ k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1);
+ }
+ attrs.push_back(std::make_pair(k, attr.second));
+ }
+ // at this point we don't know whether we need the ceph event or S3 record
+ // this is why both are created here, once we have information about the
+ // subscription, we will store/push only the relevant ones
+ make_event_ref(sync_env->cct,
+ bucket_info.bucket, key,
+ mtime, &attrs,
+ rgw::notify::ObjectCreated, &event);
+ make_s3_record_ref(sync_env->cct,
+ bucket_info.bucket, bucket_info.owner, key,
+ mtime, &attrs,
+ rgw::notify::ObjectCreated, &record);
+ }
+
+ yield call(new RGWPSHandleObjEventCR(sync_env, env, bucket_info.owner, event, record, topics));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ PSEnvRef env;
+ std::optional<uint64_t> versioned_epoch;
+ TopicsRef topics;
+public:
+ RGWPSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ PSEnvRef _env, std::optional<uint64_t> _versioned_epoch,
+ TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ env(_env), versioned_epoch(_versioned_epoch),
+ topics(_topics) {
+ }
+
+ ~RGWPSHandleRemoteObjCR() override {}
+
+ RGWStatRemoteObjCBCR *allocate_callback() override {
+ return new RGWPSHandleRemoteObjCBCR(sync_env, bucket_info, key, env, versioned_epoch, topics);
+ }
+};
+
+class RGWPSHandleObjCreateCR : public RGWCoroutine {
+
+ RGWDataSyncEnv *sync_env;
+ RGWBucketInfo bucket_info;
+ rgw_obj_key key;
+ PSEnvRef env;
+ std::optional<uint64_t> versioned_epoch;
+ TopicsRef topics;
+public:
+ RGWPSHandleObjCreateCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
+ PSEnvRef _env, std::optional<uint64_t> _versioned_epoch) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ bucket_info(_bucket_info),
+ key(_key),
+ env(_env),
+ versioned_epoch(_versioned_epoch) {
+ }
+
+ ~RGWPSHandleObjCreateCR() override {}
+
+ int operate() override {
+ reenter(this) {
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, bucket_info.owner,
+ bucket_info.bucket, key,
+ rgw::notify::ObjectCreated,
+ &topics));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (topics->empty()) {
+ ldout(sync_env->cct, 20) << "no topics found for " << bucket_info.bucket << "/" << key << dendl;
+ return set_cr_done();
+ }
+ yield call(new RGWPSHandleRemoteObjCR(sync_env, bucket_info, key, env, versioned_epoch, topics));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+// coroutine invoked on remote object deletion
+class RGWPSGenericObjEventCBCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ PSEnvRef env;
+ rgw_user owner;
+ rgw_bucket bucket;
+ rgw_obj_key key;
+ ceph::real_time mtime;
+ rgw::notify::EventType event_type;
+ EventRef<rgw_pubsub_event> event;
+ EventRef<rgw_pubsub_s3_record> record;
+ TopicsRef topics;
+public:
+ RGWPSGenericObjEventCBCR(RGWDataSyncEnv *_sync_env,
+ PSEnvRef _env,
+ RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ rgw::notify::EventType _event_type) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ env(_env),
+ owner(_bucket_info.owner),
+ bucket(_bucket_info.bucket),
+ key(_key),
+ mtime(_mtime), event_type(_event_type) {}
+ int operate() override {
+ reenter(this) {
+ ldout(sync_env->cct, 20) << ": remove remote obj: z=" << sync_env->source_zone
+ << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl;
+ yield call(new RGWPSFindBucketTopicsCR(sync_env, env, owner, bucket, key, event_type, &topics));
+ if (retcode < 0) {
+ ldout(sync_env->cct, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ if (topics->empty()) {
+ ldout(sync_env->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
+ return set_cr_done();
+ }
+ // at this point we don't know whether we need the ceph event or S3 record
+ // this is why both are created here, once we have information about the
+ // subscription, we will store/push only the relevant ones
+ make_event_ref(sync_env->cct,
+ bucket, key,
+ mtime, nullptr,
+ event_type, &event);
+ make_s3_record_ref(sync_env->cct,
+ bucket, owner, key,
+ mtime, nullptr,
+ event_type, &record);
+ yield call(new RGWPSHandleObjEventCR(sync_env, env, owner, event, record, topics));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
+};
+
+class RGWPSDataSyncModule : public RGWDataSyncModule {
+ PSEnvRef env;
+ PSConfigRef& conf;
+
+public:
+ RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared<PSEnv>()), conf(env->conf) {
+ env->init(cct, config);
+ }
+
+ ~RGWPSDataSyncModule() override {}
+
+ void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
+ PSManagerRef mgr = PSManager::get_shared(sync_env, env);
+ env->init_instance(sync_env->store->svc.zone->get_realm(), instance_id, mgr);
+ }
+
+ RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
+ ldout(sync_env->cct, 5) << conf->id << ": start" << dendl;
+ return new RGWPSInitEnvCBCR(sync_env, env);
+ }
+
+ RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket <<
+ " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ return new RGWPSHandleObjCreateCR(sync_env, bucket_info, key, env, versioned_epoch);
+ }
+
+ RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket <<
+ " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDelete);
+ }
+
+ RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info,
+ rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket <<
+ " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ return new RGWPSGenericObjEventCBCR(sync_env, env, bucket_info, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated);
+ }
+
+ PSConfigRef& get_conf() { return conf; }
+};
+
+RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config)
+{
+ data_handler = std::unique_ptr<RGWPSDataSyncModule>(new RGWPSDataSyncModule(cct, config));
+ const std::string jconf = json_str("conf", *data_handler->get_conf());
+ JSONParser p;
+ if (!p.parse(jconf.c_str(), jconf.size())) {
+ ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl;
+ effective_conf = config;
+ } else {
+ effective_conf.decode_json(&p);
+ }
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ if (!rgw::amqp::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to initialize AMQP manager in pubsub sync module" << dendl;
+ }
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ if (!rgw::kafka::init(cct)) {
+ ldout(cct, 1) << "ERROR: failed to initialize Kafka manager in pubsub sync module" << dendl;
+ }
+#endif
+}
+
+RGWPSSyncModuleInstance::~RGWPSSyncModuleInstance() {
+#ifdef WITH_RADOSGW_AMQP_ENDPOINT
+ rgw::amqp::shutdown();
+#endif
+#ifdef WITH_RADOSGW_KAFKA_ENDPOINT
+ rgw::kafka::shutdown();
+#endif
+}
+
+RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler()
+{
+ return data_handler.get();
+}
+
+RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) {
+ if (dialect != RGW_REST_S3) {
+ return orig;
+ }
+ return new RGWRESTMgr_PubSub();
+}
+
+bool RGWPSSyncModuleInstance::should_full_sync() const {
+ return data_handler->get_conf()->start_with_full_sync;
+}
+
+int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
+ instance->reset(new RGWPSSyncModuleInstance(cct, config));
+ return 0;
+}
+
+