// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab ft=cpp #include "services/svc_zone.h" #include "rgw_common.h" #include "rgw_coroutine.h" #include "rgw_sync_module.h" #include "rgw_data_sync.h" #include "rgw_sync_module_pubsub.h" #include "rgw_sync_module_pubsub_rest.h" #include "rgw_rest_conn.h" #include "rgw_cr_rados.h" #include "rgw_cr_rest.h" #include "rgw_cr_tools.h" #include "rgw_op.h" #include "rgw_pubsub.h" #include "rgw_pubsub_push.h" #include "rgw_notify_event_type.h" #include "rgw_perf_counters.h" #include #include #define dout_subsys ceph_subsys_rgw #define PUBSUB_EVENTS_RETENTION_DEFAULT 7 /* config: { "tenant": , # default: "uid": , # default: "pubsub" "data_bucket_prefix": # default: "pubsub-" "data_oid_prefix": # "events_retention_days": # default: 7 "start_with_full_sync" # default: false } */ // utility function to convert the args list from string format // (ampresend separated with equal sign) to prased structure RGWHTTPArgs string_to_args(const std::string& str_args, const DoutPrefixProvider *dpp) { RGWHTTPArgs args; args.set(str_args); args.parse(dpp); return args; } struct PSSubConfig { std::string name; std::string topic; std::string push_endpoint_name; std::string push_endpoint_args; std::string data_bucket_name; std::string data_oid_prefix; std::string s3_id; std::string arn_topic; RGWPubSubEndpoint::Ptr push_endpoint; void from_user_conf(CephContext *cct, const rgw_pubsub_sub_config& uc, const DoutPrefixProvider *dpp) { name = uc.name; topic = uc.topic; push_endpoint_name = uc.dest.push_endpoint; data_bucket_name = uc.dest.bucket_name; data_oid_prefix = uc.dest.oid_prefix; s3_id = uc.s3_id; arn_topic = uc.dest.arn_topic; if (!push_endpoint_name.empty()) { push_endpoint_args = uc.dest.push_endpoint_args; try { push_endpoint = RGWPubSubEndpoint::create(push_endpoint_name, arn_topic, string_to_args(push_endpoint_args, dpp), cct); ldpp_dout(dpp, 20) << "push endpoint created: " << push_endpoint->to_str() << dendl; } catch (const RGWPubSubEndpoint::configuration_error& e) { ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: " << push_endpoint_name << " due to: " << e.what() << dendl; } } } void dump(Formatter *f) const { encode_json("name", name, f); encode_json("topic", topic, f); encode_json("push_endpoint", push_endpoint_name, f); encode_json("push_endpoint_args", push_endpoint_args, f); encode_json("data_bucket_name", data_bucket_name, f); encode_json("data_oid_prefix", data_oid_prefix, f); encode_json("s3_id", s3_id, f); } }; using PSSubConfigRef = std::shared_ptr; struct PSTopicConfig { std::string name; std::set subs; std::string opaque_data; void dump(Formatter *f) const { encode_json("name", name, f); encode_json("subs", subs, f); encode_json("opaque", opaque_data, f); } }; struct PSNotificationConfig { uint64_t id{0}; string path; /* a path or a path prefix that would trigger the event (prefix: if ends with a wildcard) */ string topic; bool is_prefix{false}; void dump(Formatter *f) const { encode_json("id", id, f); encode_json("path", path, f); encode_json("topic", topic, f); encode_json("is_prefix", is_prefix, f); } void init(CephContext *cct, const JSONFormattable& config) { path = config["path"]; if (!path.empty() && path[path.size() - 1] == '*') { path = path.substr(0, path.size() - 1); is_prefix = true; } topic = config["topic"]; } }; template static string json_str(const char *name, const T& obj, bool pretty = false) { stringstream ss; JSONFormatter f(pretty); encode_json(name, obj, &f); f.flush(ss); return ss.str(); } using PSTopicConfigRef = std::shared_ptr; using TopicsRef = std::shared_ptr>; // global pubsub configuration struct PSConfig { const std::string id{"pubsub"}; rgw_user user; std::string data_bucket_prefix; std::string data_oid_prefix; int events_retention_days{0}; uint64_t sync_instance{0}; bool start_with_full_sync{false}; void dump(Formatter *f) const { encode_json("id", id, f); encode_json("user", user, f); encode_json("data_bucket_prefix", data_bucket_prefix, f); encode_json("data_oid_prefix", data_oid_prefix, f); encode_json("events_retention_days", events_retention_days, f); encode_json("sync_instance", sync_instance, f); encode_json("start_with_full_sync", start_with_full_sync, f); } void init(CephContext *cct, const JSONFormattable& config) { string uid = config["uid"]("pubsub"); user = rgw_user(config["tenant"], uid); data_bucket_prefix = config["data_bucket_prefix"]("pubsub-"); data_oid_prefix = config["data_oid_prefix"]; events_retention_days = config["events_retention_days"](PUBSUB_EVENTS_RETENTION_DEFAULT); start_with_full_sync = config["start_with_full_sync"](false); ldout(cct, 20) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl; } void init_instance(const RGWRealm& realm, uint64_t instance_id) { sync_instance = instance_id; } }; using PSConfigRef = std::shared_ptr; template using EventRef = std::shared_ptr; struct objstore_event { string id; const rgw_bucket& bucket; const rgw_obj_key& key; const ceph::real_time& mtime; const std::vector > *attrs; objstore_event(const rgw_bucket& _bucket, const rgw_obj_key& _key, const ceph::real_time& _mtime, const std::vector > *_attrs) : bucket(_bucket), key(_key), mtime(_mtime), attrs(_attrs) {} string get_hash() { string etag; RGWMD5Etag hash; hash.update(bucket.bucket_id); hash.update(key.name); hash.update(key.instance); hash.finish(&etag); assert(etag.size() > 8); return etag.substr(0, 8); } void dump(Formatter *f) const { { Formatter::ObjectSection s(*f, "bucket"); encode_json("name", bucket.name, f); encode_json("tenant", bucket.tenant, f); encode_json("bucket_id", bucket.bucket_id, f); } { Formatter::ObjectSection s(*f, "key"); encode_json("name", key.name, f); encode_json("instance", key.instance, f); } utime_t mt(mtime); encode_json("mtime", mt, f); Formatter::ObjectSection s(*f, "attrs"); if (attrs) { for (auto& attr : *attrs) { encode_json(attr.first.c_str(), attr.second.c_str(), f); } } } }; static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, const ceph::real_time& mtime, const std::vector > *attrs, rgw::notify::EventType event_type, EventRef *event) { *event = std::make_shared(); EventRef& e = *event; e->event_name = rgw::notify::to_ceph_string(event_type); e->source = bucket.name + "/" + key.name; e->timestamp = real_clock::now(); objstore_event oevent(bucket, key, mtime, attrs); const utime_t ts(e->timestamp); set_event_id(e->id, oevent.get_hash(), ts); encode_json("info", oevent, &e->info); } static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_user& owner, const rgw_obj_key& key, const ceph::real_time& mtime, const std::vector>* attrs, rgw::notify::EventType event_type, EventRef* event) { *event = std::make_shared(); EventRef& e = *event; e->eventTime = mtime; e->eventName = rgw::notify::to_event_string(event_type); // userIdentity: not supported in sync module // x_amz_request_id: not supported in sync module // x_amz_id_2: not supported in sync module // configurationId is filled from subscription configuration e->bucket_name = bucket.name; e->bucket_ownerIdentity = owner.to_str(); e->bucket_arn = to_string(rgw::ARN(bucket)); e->bucket_id = bucket.bucket_id; // rgw extension e->object_key = key.name; // object_size not supported in sync module objstore_event oevent(bucket, key, mtime, attrs); e->object_etag = oevent.get_hash(); e->object_versionId = key.instance; // use timestamp as per key sequence id (hex encoded) const utime_t ts(real_clock::now()); boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), std::back_inserter(e->object_sequencer)); set_event_id(e->id, e->object_etag, ts); } class PSManager; using PSManagerRef = std::shared_ptr; struct PSEnv { PSConfigRef conf; shared_ptr data_user_info; PSManagerRef manager; PSEnv() : conf(make_shared()), data_user_info(make_shared()) {} void init(CephContext *cct, const JSONFormattable& config) { conf->init(cct, config); } void init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr); }; using PSEnvRef = std::shared_ptr; template class PSEvent { const EventRef event; public: PSEvent(const EventRef& _event) : event(_event) {} void format(bufferlist *bl) const { bl->append(json_str("", *event)); } void encode_event(bufferlist& bl) const { encode(*event, bl); } const string& id() const { return event->id; } }; template class RGWSingletonCR : public RGWCoroutine { friend class WrapperCR; boost::asio::coroutine wrapper_state; bool started{false}; int operate_ret{0}; struct WaiterInfo { RGWCoroutine *cr{nullptr}; T *result; }; using WaiterInfoRef = std::shared_ptr; deque waiters; void add_waiter(RGWCoroutine *cr, T *result) { auto waiter = std::make_shared(); waiter->cr = cr; waiter->result = result; waiters.push_back(waiter); }; bool get_next_waiter(WaiterInfoRef *waiter) { if (waiters.empty()) { waiter->reset(); return false; } *waiter = waiters.front(); waiters.pop_front(); return true; } int operate_wrapper(const DoutPrefixProvider *dpp) override { reenter(&wrapper_state) { while (!is_done()) { ldpp_dout(dpp, 20) << __func__ << "(): operate_wrapper() -> operate()" << dendl; operate_ret = operate(dpp); if (operate_ret < 0) { ldpp_dout(dpp, 20) << *this << ": operate() returned r=" << operate_ret << dendl; } if (!is_done()) { yield; } } ldpp_dout(dpp, 20) << __func__ << "(): RGWSingletonCR: operate_wrapper() done, need to wake up " << waiters.size() << " waiters" << dendl; /* we're done, can't yield anymore */ WaiterInfoRef waiter; while (get_next_waiter(&waiter)) { ldout(cct, 20) << __func__ << "(): RGWSingletonCR: waking up waiter" << dendl; waiter->cr->set_retcode(retcode); waiter->cr->set_sleeping(false); return_result(waiter->result); put(); } return retcode; } return 0; } virtual void return_result(T *result) {} public: RGWSingletonCR(CephContext *_cct) : RGWCoroutine(_cct) {} int execute(RGWCoroutine *caller, T *result = nullptr) { if (!started) { ldout(cct, 20) << __func__ << "(): singleton not started, starting" << dendl; started = true; caller->call(this); return 0; } else if (!is_done()) { ldout(cct, 20) << __func__ << "(): singleton not done yet, registering as waiter" << dendl; get(); add_waiter(caller, result); caller->set_sleeping(true); return 0; } ldout(cct, 20) << __func__ << "(): singleton done, returning retcode=" << retcode << dendl; caller->set_retcode(retcode); return_result(result); return retcode; } }; class PSSubscription; using PSSubscriptionRef = std::shared_ptr; class PSSubscription { class InitCR; friend class InitCR; friend class RGWPSHandleObjEventCR; RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; PSSubConfigRef sub_conf; std::shared_ptr get_bucket_info_result; RGWBucketInfo *bucket_info{nullptr}; RGWDataAccessRef data_access; RGWDataAccess::BucketRef bucket; InitCR *init_cr{nullptr}; class InitBucketLifecycleCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSConfigRef& conf; LCRule rule; int retention_days; rgw_bucket_lifecycle_config_params lc_config; public: InitBucketLifecycleCR(RGWDataSyncCtx *_sc, PSConfigRef& _conf, RGWBucketInfo& _bucket_info, std::map& _bucket_attrs) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), conf(_conf) { lc_config.bucket_info = _bucket_info; lc_config.bucket_attrs = _bucket_attrs; retention_days = conf->events_retention_days; } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { rule.init_simple_days_rule("Pubsub Expiration", "" /* all objects in bucket */, retention_days); { /* maybe we already have it configured? */ RGWLifecycleConfiguration old_config; auto aiter = lc_config.bucket_attrs.find(RGW_ATTR_LC); if (aiter != lc_config.bucket_attrs.end()) { bufferlist::const_iterator iter{&aiter->second}; try { old_config.decode(iter); } catch (const buffer::error& e) { ldpp_dout(dpp, 0) << __func__ << "(): decode life cycle config failed" << dendl; } } auto old_rules = old_config.get_rule_map(); for (auto ori : old_rules) { auto& old_rule = ori.second; if (old_rule.get_prefix().empty() && old_rule.get_expiration().get_days() == retention_days && old_rule.is_enabled()) { ldpp_dout(dpp, 20) << "no need to set lifecycle rule on bucket, existing rule matches config" << dendl; return set_cr_done(); } } } lc_config.config.add_rule(rule); yield call(new RGWBucketLifecycleConfigCR(sync_env->async_rados, sync_env->store, lc_config, dpp)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to set lifecycle on bucket: ret=" << retcode << dendl; return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class InitCR : public RGWSingletonCR { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSSubscriptionRef sub; rgw_get_bucket_info_params get_bucket_info; rgw_bucket_create_local_params create_bucket; PSConfigRef& conf; PSSubConfigRef& sub_conf; int i; public: InitCR(RGWDataSyncCtx *_sc, PSSubscriptionRef& _sub) : RGWSingletonCR(_sc->cct), sc(_sc), sync_env(_sc->env), sub(_sub), conf(sub->env->conf), sub_conf(sub->sub_conf) { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { get_bucket_info.tenant = conf->user.tenant; get_bucket_info.bucket_name = sub_conf->data_bucket_name; sub->get_bucket_info_result = make_shared(); for (i = 0; i < 2; ++i) { yield call(new RGWGetBucketInfoCR(sync_env->async_rados, sync_env->store, get_bucket_info, sub->get_bucket_info_result, dpp)); if (retcode < 0 && retcode != -ENOENT) { ldpp_dout(dpp, 1) << "ERROR: failed to geting bucket info: " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; } if (retcode == 0) { { auto& result = sub->get_bucket_info_result; sub->bucket_info = &result->bucket_info; int ret = sub->data_access->get_bucket(result->bucket_info, result->attrs, &sub->bucket); if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: data_access.get_bucket() bucket=" << result->bucket_info.bucket << " failed, ret=" << ret << dendl; return set_cr_error(ret); } } yield call(new InitBucketLifecycleCR(sc, conf, sub->get_bucket_info_result->bucket_info, sub->get_bucket_info_result->attrs)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to init lifecycle on bucket (bucket=" << sub_conf->data_bucket_name << ") ret=" << retcode << dendl; return set_cr_error(retcode); } return set_cr_done(); } create_bucket.user_info = sub->env->data_user_info; create_bucket.bucket_name = sub_conf->data_bucket_name; ldpp_dout(dpp, 20) << "pubsub: bucket create: using user info: " << json_str("obj", *sub->env->data_user_info, true) << dendl; yield call(new RGWBucketCreateLocalCR(sync_env->async_rados, sync_env->store, create_bucket, dpp)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to create bucket: " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << ": ret=" << retcode << dendl; return set_cr_error(retcode); } /* second iteration: we got -ENOENT and created a bucket */ } /* failed twice on -ENOENT, unexpected */ ldpp_dout(dpp, 1) << "ERROR: failed to create bucket " << "tenant=" << get_bucket_info.tenant << " name=" << get_bucket_info.bucket_name << dendl; return set_cr_error(-EIO); } return 0; } }; template class StoreEventCR : public RGWCoroutine { RGWDataSyncCtx* const sc; RGWDataSyncEnv* const sync_env; const PSSubscriptionRef sub; const PSEvent pse; const string oid_prefix; public: StoreEventCR(RGWDataSyncCtx* const _sc, const PSSubscriptionRef& _sub, const EventRef& _event) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sub(_sub), pse(_event), oid_prefix(sub->sub_conf->data_oid_prefix) { } int operate(const DoutPrefixProvider *dpp) override { rgw_object_simple_put_params put_obj; reenter(this) { put_obj.bucket = sub->bucket; put_obj.key = rgw_obj_key(oid_prefix + pse.id()); pse.format(&put_obj.data); { bufferlist bl; pse.encode_event(bl); bufferlist bl64; bl.encode_base64(bl64); put_obj.user_data = bl64.to_str(); } yield call(new RGWObjectSimplePutCR(sync_env->async_rados, sync_env->store, put_obj, dpp)); if (retcode < 0) { ldpp_dout(dpp, 10) << "failed to store event: " << put_obj.bucket << "/" << put_obj.key << " ret=" << retcode << dendl; return set_cr_error(retcode); } else { ldpp_dout(dpp, 20) << "event stored: " << put_obj.bucket << "/" << put_obj.key << dendl; } return set_cr_done(); } return 0; } }; template class PushEventCR : public RGWCoroutine { RGWDataSyncCtx* const sc; RGWDataSyncEnv* const sync_env; const EventRef event; const PSSubConfigRef& sub_conf; public: PushEventCR(RGWDataSyncCtx* const _sc, const PSSubscriptionRef& _sub, const EventRef& _event) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), event(_event), sub_conf(_sub->sub_conf) { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ceph_assert(sub_conf->push_endpoint); yield call(sub_conf->push_endpoint->send_to_completion_async(*event.get(), sync_env)); if (retcode < 0) { ldpp_dout(dpp, 10) << "failed to push event: " << event->id << " to endpoint: " << sub_conf->push_endpoint_name << " ret=" << retcode << dendl; return set_cr_error(retcode); } ldpp_dout(dpp, 20) << "event: " << event->id << " pushed to endpoint: " << sub_conf->push_endpoint_name << dendl; return set_cr_done(); } return 0; } }; public: PSSubscription(RGWDataSyncCtx *_sc, PSEnvRef _env, PSSubConfigRef& _sub_conf) : sc(_sc), sync_env(_sc->env), env(_env), sub_conf(_sub_conf), data_access(std::make_shared(sync_env->store)) {} PSSubscription(RGWDataSyncCtx *_sc, PSEnvRef _env, rgw_pubsub_sub_config& user_sub_conf) : sc(_sc), sync_env(_sc->env), env(_env), sub_conf(std::make_shared()), data_access(std::make_shared(sync_env->store)) { sub_conf->from_user_conf(sync_env->cct, user_sub_conf, sync_env->dpp); } virtual ~PSSubscription() { if (init_cr) { init_cr->put(); } } template static PSSubscriptionRef get_shared(RGWDataSyncCtx *_sc, PSEnvRef _env, C& _sub_conf) { auto sub = std::make_shared(_sc, _env, _sub_conf); sub->init_cr = new InitCR(_sc, sub); sub->init_cr->get(); return sub; } int call_init_cr(RGWCoroutine *caller) { return init_cr->execute(caller); } template static RGWCoroutine *store_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef& event) { return new StoreEventCR(sc, sub, event); } template static RGWCoroutine *push_event_cr(RGWDataSyncCtx* const sc, const PSSubscriptionRef& sub, const EventRef& event) { return new PushEventCR(sc, sub, event); } friend class InitCR; }; class PSManager { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; std::map subs; class GetSubCR : public RGWSingletonCR { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSManagerRef mgr; rgw_user owner; string sub_name; string sub_id; PSSubscriptionRef *ref; PSConfigRef conf; PSSubConfigRef sub_conf; rgw_pubsub_sub_config user_sub_conf; public: GetSubCR(RGWDataSyncCtx *_sc, PSManagerRef& _mgr, const rgw_user& _owner, const string& _sub_name, PSSubscriptionRef *_ref) : RGWSingletonCR(_sc->cct), sc(_sc), sync_env(_sc->env), mgr(_mgr), owner(_owner), sub_name(_sub_name), ref(_ref), conf(mgr->env->conf) { } ~GetSubCR() { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { if (owner.empty()) { ldpp_dout(dpp, 1) << "ERROR: missing user info when getting subscription: " << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(-EINVAL); } else { using ReadInfoCR = RGWSimpleRadosReadCR; yield { RGWPubSub ps(sync_env->store, owner.tenant); rgw_raw_obj obj; ps.get_sub_meta_obj(sub_name, &obj); bool empty_on_enoent = false; call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, obj, &user_sub_conf, empty_on_enoent)); } if (retcode < 0) { mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } *ref = PSSubscription::get_shared(sc, mgr->env, user_sub_conf); } yield (*ref)->call_init_cr(this); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to init subscription when getting subscription: " << sub_name << dendl; mgr->remove_get_sub(owner, sub_name); return set_cr_error(retcode); } mgr->remove_get_sub(owner, sub_name); return set_cr_done(); } return 0; } void return_result(PSSubscriptionRef *result) override { ldout(cct, 20) << __func__ << "(): returning result: retcode=" << retcode << " resultp=" << (void *)result << dendl; if (retcode >= 0) { *result = *ref; } } }; string sub_id(const rgw_user& owner, const string& sub_name) { string owner_prefix; if (!owner.empty()) { owner_prefix = owner.to_str() + "/"; } return owner_prefix + sub_name; } std::map get_subs; GetSubCR *& get_get_subs(const rgw_user& owner, const string& name) { return get_subs[sub_id(owner, name)]; } void remove_get_sub(const rgw_user& owner, const string& name) { get_subs.erase(sub_id(owner, name)); } bool find_sub_instance(const rgw_user& owner, const string& sub_name, PSSubscriptionRef *sub) { auto iter = subs.find(sub_id(owner, sub_name)); if (iter != subs.end()) { *sub = iter->second; return true; } return false; } PSManager(RGWDataSyncCtx *_sc, PSEnvRef _env) : sc(_sc), sync_env(_sc->env), env(_env) {} public: static PSManagerRef get_shared(RGWDataSyncCtx *_sc, PSEnvRef _env) { return std::shared_ptr(new PSManager(_sc, _env)); } static int call_get_subscription_cr(RGWDataSyncCtx *sc, PSManagerRef& mgr, RGWCoroutine *caller, const rgw_user& owner, const string& sub_name, PSSubscriptionRef *ref) { if (mgr->find_sub_instance(owner, sub_name, ref)) { /* found it! nothing to execute */ ldout(sc->cct, 20) << __func__ << "(): found sub instance" << dendl; } auto& gs = mgr->get_get_subs(owner, sub_name); if (!gs) { ldout(sc->cct, 20) << __func__ << "(): first get subs" << dendl; gs = new GetSubCR(sc, mgr, owner, sub_name, ref); } ldout(sc->cct, 20) << __func__ << "(): executing get subs" << dendl; return gs->execute(caller, ref); } friend class GetSubCR; }; void PSEnv::init_instance(const RGWRealm& realm, uint64_t instance_id, PSManagerRef& mgr) { manager = mgr; conf->init_instance(realm, instance_id); } class RGWPSInitEnvCBCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; PSConfigRef& conf; rgw_user_create_params create_user; rgw_get_user_info_params get_user_info; public: RGWPSInitEnvCBCR(RGWDataSyncCtx *_sc, PSEnvRef& _env) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), env(_env), conf(env->conf) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ldpp_dout(dpp, 1) << ": init pubsub config zone=" << sc->source_zone << dendl; /* nothing to do here right now */ create_user.user = conf->user; create_user.max_buckets = 0; /* unlimited */ create_user.display_name = "pubsub"; create_user.generate_key = false; yield call(new RGWUserCreateCR(sync_env->async_rados, sync_env->store, create_user, dpp)); if (retcode < 0 && retcode != -ERR_USER_EXIST) { ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; return set_cr_error(retcode); } get_user_info.user = conf->user; yield call(new RGWGetUserInfoCR(sync_env->async_rados, sync_env->store, get_user_info, env->data_user_info, dpp)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to create rgw user: ret=" << retcode << dendl; return set_cr_error(retcode); } ldpp_dout(dpp, 20) << "pubsub: get user info cr returned: " << json_str("obj", *env->data_user_info, true) << dendl; return set_cr_done(); } return 0; } }; bool match(const rgw_pubsub_topic_filter& filter, const std::string& key_name, rgw::notify::EventType event_type) { if (!match(filter.events, event_type)) { return false; } if (!match(filter.s3_filter.key_filter, key_name)) { return false; } return true; } class RGWPSFindBucketTopicsCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; PSEnvRef env; rgw_user owner; rgw_bucket bucket; rgw_obj_key key; rgw::notify::EventType event_type; RGWPubSub ps; rgw_raw_obj bucket_obj; rgw_raw_obj user_obj; rgw_pubsub_bucket_topics bucket_topics; rgw_pubsub_topics user_topics; TopicsRef *topics; public: RGWPSFindBucketTopicsCR(RGWDataSyncCtx *_sc, PSEnvRef& _env, const rgw_user& _owner, const rgw_bucket& _bucket, const rgw_obj_key& _key, rgw::notify::EventType _event_type, TopicsRef *_topics) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), env(_env), owner(_owner), bucket(_bucket), key(_key), event_type(_event_type), ps(sync_env->store, owner.tenant), topics(_topics) { *topics = std::make_shared >(); } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ps.get_bucket_meta_obj(bucket, &bucket_obj); ps.get_meta_obj(&user_obj); using ReadInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = true; call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, bucket_obj, &bucket_topics, empty_on_enoent)); } if (retcode < 0 && retcode != -ENOENT) { return set_cr_error(retcode); } ldpp_dout(dpp, 20) << "RGWPSFindBucketTopicsCR(): found " << bucket_topics.topics.size() << " topics for bucket " << bucket << dendl; if (!bucket_topics.topics.empty()) { using ReadUserTopicsInfoCR = RGWSimpleRadosReadCR; yield { bool empty_on_enoent = true; call(new ReadUserTopicsInfoCR(dpp, sync_env->async_rados, sync_env->store->svc()->sysobj, user_obj, &user_topics, empty_on_enoent)); } if (retcode < 0 && retcode != -ENOENT) { return set_cr_error(retcode); } } for (auto& titer : bucket_topics.topics) { auto& topic_filter = titer.second; auto& info = topic_filter.topic; if (!match(topic_filter, key.name, event_type)) { continue; } std::shared_ptr tc = std::make_shared(); tc->name = info.name; tc->subs = user_topics.topics[info.name].subs; tc->opaque_data = info.opaque_data; (*topics)->push_back(tc); } return set_cr_done(); } return 0; } }; class RGWPSHandleObjEventCR : public RGWCoroutine { RGWDataSyncCtx* const sc; const PSEnvRef env; const rgw_user owner; const EventRef event; const EventRef s3_event; const TopicsRef topics; bool has_subscriptions; bool event_handled; PSSubscriptionRef sub; std::vector::const_iterator titer; std::set::const_iterator siter; public: RGWPSHandleObjEventCR(RGWDataSyncCtx* const _sc, const PSEnvRef _env, const rgw_user& _owner, const EventRef& _event, const EventRef& _s3_event, const TopicsRef& _topics) : RGWCoroutine(_sc->cct), sc(_sc), env(_env), owner(_owner), event(_event), s3_event(_s3_event), topics(_topics), has_subscriptions(false), event_handled(false) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ldpp_dout(dpp, 20) << ": handle event: obj: z=" << sc->source_zone << " event=" << json_str("event", *event, false) << " owner=" << owner << dendl; ldpp_dout(dpp, 20) << "pubsub: " << topics->size() << " topics found for path" << dendl; // outside caller should check that ceph_assert(!topics->empty()); if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); // loop over all topics related to the bucket/object for (titer = topics->begin(); titer != topics->end(); ++titer) { ldpp_dout(dpp, 20) << ": notification for " << event->source << ": topic=" << (*titer)->name << ", has " << (*titer)->subs.size() << " subscriptions" << dendl; // loop over all subscriptions of the topic for (siter = (*titer)->subs.begin(); siter != (*titer)->subs.end(); ++siter) { ldpp_dout(dpp, 20) << ": subscription: " << *siter << dendl; has_subscriptions = true; // try to read subscription configuration yield PSManager::call_get_subscription_cr(sc, env->manager, this, owner, *siter, &sub); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_missing_conf); ldpp_dout(dpp, 1) << "ERROR: failed to find subscription config for subscription=" << *siter << " ret=" << retcode << dendl; if (retcode == -ENOENT) { // missing subscription info should be reflected back as invalid argument // and not as missing object retcode = -EINVAL; } // try the next subscription continue; } if (sub->sub_conf->s3_id.empty()) { // subscription was not made by S3 compatible API ldpp_dout(dpp, 20) << "storing event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; yield call(PSSubscription::store_event_cr(sc, sub, event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); ldpp_dout(dpp, 1) << "ERROR: failed to store event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { ldpp_dout(dpp, 20) << "push event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; yield call(PSSubscription::push_event_cr(sc, sub, event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); ldpp_dout(dpp, 1) << "ERROR: failed to push event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } } } else { // subscription was made by S3 compatible API ldpp_dout(dpp, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; s3_event->configurationId = sub->sub_conf->s3_id; s3_event->opaque_data = (*titer)->opaque_data; yield call(PSSubscription::store_event_cr(sc, sub, s3_event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail); ldpp_dout(dpp, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok); event_handled = true; } if (sub->sub_conf->push_endpoint) { ldpp_dout(dpp, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl; yield call(PSSubscription::push_event_cr(sc, sub, s3_event)); if (retcode < 0) { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); ldpp_dout(dpp, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl; } else { if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); event_handled = true; } } } } } if (has_subscriptions && !event_handled) { // event is considered "lost" of it has subscriptions on any of its topics // but it was not stored in, or pushed to, any of them if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); } if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; // coroutine invoked on remote object creation class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR { RGWDataSyncCtx *sc; rgw_bucket_sync_pipe sync_pipe; PSEnvRef env; std::optional versioned_epoch; EventRef event; EventRef s3_event; TopicsRef topics; public: RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, TopicsRef& _topics) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sc(_sc), sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { } int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ldpp_dout(dpp, 20) << ": stat of remote obj: z=" << sc->source_zone << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " size=" << size << " mtime=" << mtime << " attrs=" << attrs << dendl; { std::vector > attrs; for (auto& attr : attrs) { std::string k = attr.first; if (boost::algorithm::starts_with(k, RGW_ATTR_PREFIX)) { k = k.substr(sizeof(RGW_ATTR_PREFIX) - 1); } attrs.push_back(std::make_pair(k, attr.second)); } // at this point we don't know whether we need the ceph event or S3 event // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sc->cct, sync_pipe.info.source_bs.bucket, key, mtime, &attrs, rgw::notify::ObjectCreated, &event); make_s3_event_ref(sc->cct, sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key, mtime, &attrs, rgw::notify::ObjectCreated, &s3_event); } yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics)); if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWPSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { rgw_bucket_sync_pipe sync_pipe; PSEnvRef env; std::optional versioned_epoch; TopicsRef topics; public: RGWPSHandleRemoteObjCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch, TopicsRef& _topics) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key), sync_pipe(_sync_pipe), env(_env), versioned_epoch(_versioned_epoch), topics(_topics) { } ~RGWPSHandleRemoteObjCR() override {} RGWStatRemoteObjCBCR *allocate_callback() override { return new RGWPSHandleRemoteObjCBCR(sc, sync_pipe, key, env, versioned_epoch, topics); } }; class RGWPSHandleObjCreateCR : public RGWCoroutine { RGWDataSyncCtx *sc; rgw_bucket_sync_pipe sync_pipe; rgw_obj_key key; PSEnvRef env; std::optional versioned_epoch; TopicsRef topics; public: RGWPSHandleObjCreateCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, PSEnvRef _env, std::optional _versioned_epoch) : RGWCoroutine(_sc->cct), sc(_sc), sync_pipe(_sync_pipe), key(_key), env(_env), versioned_epoch(_versioned_epoch) { } ~RGWPSHandleObjCreateCR() override {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { yield call(new RGWPSFindBucketTopicsCR(sc, env, sync_pipe.dest_bucket_info.owner, sync_pipe.info.source_bs.bucket, key, rgw::notify::ObjectCreated, &topics)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { ldpp_dout(dpp, 20) << "no topics found for " << sync_pipe.info.source_bs.bucket << "/" << key << dendl; return set_cr_done(); } yield call(new RGWPSHandleRemoteObjCR(sc, sync_pipe, key, env, versioned_epoch, topics)); if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; // coroutine invoked on remote object deletion class RGWPSGenericObjEventCBCR : public RGWCoroutine { RGWDataSyncCtx *sc; PSEnvRef env; rgw_user owner; rgw_bucket bucket; rgw_obj_key key; ceph::real_time mtime; rgw::notify::EventType event_type; EventRef event; EventRef s3_event; TopicsRef topics; public: RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc, PSEnvRef _env, rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime, rgw::notify::EventType _event_type) : RGWCoroutine(_sc->cct), sc(_sc), env(_env), owner(_sync_pipe.dest_bucket_info.owner), bucket(_sync_pipe.dest_bucket_info.bucket), key(_key), mtime(_mtime), event_type(_event_type) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { ldpp_dout(dpp, 20) << ": remove remote obj: z=" << sc->source_zone << " b=" << bucket << " k=" << key << " mtime=" << mtime << dendl; yield call(new RGWPSFindBucketTopicsCR(sc, env, owner, bucket, key, event_type, &topics)); if (retcode < 0) { ldpp_dout(dpp, 1) << "ERROR: RGWPSFindBucketTopicsCR returned ret=" << retcode << dendl; return set_cr_error(retcode); } if (topics->empty()) { ldpp_dout(dpp, 20) << "no topics found for " << bucket << "/" << key << dendl; return set_cr_done(); } // at this point we don't know whether we need the ceph event or S3 event // this is why both are created here, once we have information about the // subscription, we will store/push only the relevant ones make_event_ref(sc->cct, bucket, key, mtime, nullptr, event_type, &event); make_s3_event_ref(sc->cct, bucket, owner, key, mtime, nullptr, event_type, &s3_event); yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics)); if (retcode < 0) { return set_cr_error(retcode); } return set_cr_done(); } return 0; } }; class RGWPSDataSyncModule : public RGWDataSyncModule { PSEnvRef env; PSConfigRef& conf; public: RGWPSDataSyncModule(CephContext *cct, const JSONFormattable& config) : env(std::make_shared()), conf(env->conf) { env->init(cct, config); } ~RGWPSDataSyncModule() override {} void init(RGWDataSyncCtx *sc, uint64_t instance_id) override { auto sync_env = sc->env; PSManagerRef mgr = PSManager::get_shared(sc, env); env->init_instance(sync_env->svc->zone->get_realm(), instance_id, mgr); } RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override { ldout(sc->cct, 5) << conf->id << ": start" << dendl; return new RGWPSInitEnvCBCR(sc, env); } RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl; return new RGWPSHandleObjCreateCR(sc, sync_pipe, key, env, versioned_epoch); } RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDelete); } RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override { ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl; return new RGWPSGenericObjEventCBCR(sc, env, sync_pipe, key, mtime, rgw::notify::ObjectRemovedDeleteMarkerCreated); } PSConfigRef& get_conf() { return conf; } }; RGWPSSyncModuleInstance::RGWPSSyncModuleInstance(CephContext *cct, const JSONFormattable& config) { data_handler = std::unique_ptr(new RGWPSDataSyncModule(cct, config)); const std::string jconf = json_str("conf", *data_handler->get_conf()); JSONParser p; if (!p.parse(jconf.c_str(), jconf.size())) { ldout(cct, 1) << "ERROR: failed to parse sync module effective conf: " << jconf << dendl; effective_conf = config; } else { effective_conf.decode_json(&p); } } RGWDataSyncModule *RGWPSSyncModuleInstance::get_data_handler() { return data_handler.get(); } RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *orig) { if (dialect != RGW_REST_S3) { return orig; } return new RGWRESTMgr_PubSub(); } bool RGWPSSyncModuleInstance::should_full_sync() const { return data_handler->get_conf()->start_with_full_sync; } int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) { instance->reset(new RGWPSSyncModuleInstance(cct, config)); return 0; }