// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include #include #include #include "rgw_rest_pubsub_common.h" #include "rgw_rest_pubsub.h" #include "rgw_pubsub_push.h" #include "rgw_pubsub.h" #include "rgw_sync_module_pubsub.h" #include "rgw_op.h" #include "rgw_rest.h" #include "rgw_rest_s3.h" #include "rgw_arn.h" #include "rgw_auth_s3.h" #include "rgw_notify.h" #include "rgw_sal_rados.h" #include "services/svc_zone.h" #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/"); // command (AWS compliant): // POST // Action=CreateTopic&Name=[&OpaqueData=data][&push-endpoint=[&persistent][&=]] class RGWPSCreateTopic_ObjStore_AWS : public RGWPSCreateTopicOp { public: int get_params() override { topic_name = s->info.args.get("Name"); if (topic_name.empty()) { ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl; return -EINVAL; } opaque_data = s->info.args.get("OpaqueData"); dest.push_endpoint = s->info.args.get("push-endpoint"); s->info.args.get_bool("persistent", &dest.persistent, false); if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) { return -EINVAL; } for (const auto& param : s->info.args.get_params()) { if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") { continue; } dest.push_endpoint_args.append(param.first+"="+param.second+"&"); } if (!dest.push_endpoint_args.empty()) { // remove last separator dest.push_endpoint_args.pop_back(); } if (!dest.push_endpoint.empty() && dest.persistent) { const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield); if (ret < 0) { ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl; return ret; } } // dest object only stores endpoint info // bucket to store events/records will be set only when subscription is created dest.bucket_name = ""; dest.oid_prefix = ""; dest.arn_topic = topic_name; // the topic ARN will be sent in the reply const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns, store->svc()->zone->get_zonegroup().get_name(), s->user->get_tenant(), topic_name); topic_arn = arn.to_string(); return 0; } void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } const auto f = s->formatter; f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS); f->open_object_section("CreateTopicResult"); encode_xml("TopicArn", topic_arn, f); f->close_section(); // CreateTopicResult f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); f->close_section(); // ResponseMetadata f->close_section(); // CreateTopicResponse rgw_flush_formatter_and_reset(s, f); } }; // command (AWS compliant): // POST // Action=ListTopics class RGWPSListTopics_ObjStore_AWS : public RGWPSListTopicsOp { public: void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } const auto f = s->formatter; f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS); f->open_object_section("ListTopicsResult"); encode_xml("Topics", result, f); f->close_section(); // ListTopicsResult f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); f->close_section(); // ResponseMetadat f->close_section(); // ListTopicsResponse rgw_flush_formatter_and_reset(s, f); } }; // command (extension to AWS): // POST // Action=GetTopic&TopicArn= class RGWPSGetTopic_ObjStore_AWS : public RGWPSGetTopicOp { public: int get_params() override { const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); if (!topic_arn || topic_arn->resource.empty()) { ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl; return -EINVAL; } topic_name = topic_arn->resource; return 0; } void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } const auto f = s->formatter; f->open_object_section("GetTopicResponse"); f->open_object_section("GetTopicResult"); encode_xml("Topic", result.topic, f); f->close_section(); f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); f->close_section(); f->close_section(); rgw_flush_formatter_and_reset(s, f); } }; // command (AWS compliant): // POST // Action=GetTopicAttributes&TopicArn= class RGWPSGetTopicAttributes_ObjStore_AWS : public RGWPSGetTopicOp { public: int get_params() override { const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); if (!topic_arn || topic_arn->resource.empty()) { ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl; return -EINVAL; } topic_name = topic_arn->resource; return 0; } void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } const auto f = s->formatter; f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS); f->open_object_section("GetTopicAttributesResult"); result.topic.dump_xml_as_attributes(f); f->close_section(); // GetTopicAttributesResult f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); f->close_section(); // ResponseMetadata f->close_section(); // GetTopicAttributesResponse rgw_flush_formatter_and_reset(s, f); } }; // command (AWS compliant): // POST // Action=DeleteTopic&TopicArn= class RGWPSDeleteTopic_ObjStore_AWS : public RGWPSDeleteTopicOp { public: int get_params() override { const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn"))); if (!topic_arn || topic_arn->resource.empty()) { ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl; return -EINVAL; } topic_name = topic_arn->resource; // upon deletion it is not known if topic is persistent or not // will try to delete the persistent topic anyway const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield); if (ret == -ENOENT) { // topic was not persistent, or already deleted return 0; } if (ret < 0) { ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl; return ret; } return 0; } void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } const auto f = s->formatter; f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS); f->open_object_section("ResponseMetadata"); encode_xml("RequestId", s->req_id, f); f->close_section(); // ResponseMetadata f->close_section(); // DeleteTopicResponse rgw_flush_formatter_and_reset(s, f); } }; namespace { // utility classes and functions for handling parameters with the following format: // Attributes.entry.{N}.{key|value}={VALUE} // N - any unsigned number // VALUE - url encoded string // and Attribute is holding key and value // ctor and set are done according to the "type" argument // if type is not "key" or "value" its a no-op class Attribute { std::string key; std::string value; public: Attribute(const std::string& type, const std::string& key_or_value) { set(type, key_or_value); } void set(const std::string& type, const std::string& key_or_value) { if (type == "key") { key = key_or_value; } else if (type == "value") { value = key_or_value; } } const std::string& get_key() const { return key; } const std::string& get_value() const { return value; } }; using AttributeMap = std::map; // aggregate the attributes into a map // the key and value are associated by the index (N) // no assumptions are made on the order in which these parameters are added void update_attribute_map(const std::string& input, AttributeMap& map) { const boost::char_separator sep("."); const boost::tokenizer tokens(input, sep); auto token = tokens.begin(); if (*token != "Attributes") { return; } ++token; if (*token != "entry") { return; } ++token; unsigned idx; try { idx = std::stoul(*token); } catch (const std::invalid_argument&) { return; } ++token; std::string key_or_value = ""; // get the rest of the string regardless of dots // this is to allow dots in the value while (token != tokens.end()) { key_or_value.append(*token+"."); ++token; } // remove last separator key_or_value.pop_back(); auto pos = key_or_value.find("="); if (pos != string::npos) { const auto key_or_value_lhs = key_or_value.substr(0, pos); const auto key_or_value_rhs = url_decode(key_or_value.substr(pos + 1, key_or_value.size() - 1)); const auto map_it = map.find(idx); if (map_it == map.end()) { // new entry map.emplace(std::make_pair(idx, Attribute(key_or_value_lhs, key_or_value_rhs))); } else { // existing entry map_it->second.set(key_or_value_lhs, key_or_value_rhs); } } } } void RGWHandler_REST_PSTopic_AWS::rgw_topic_parse_input() { if (post_body.size() > 0) { ldpp_dout(s, 10) << "Content of POST: " << post_body << dendl; if (post_body.find("Action") != string::npos) { const boost::char_separator sep("&"); const boost::tokenizer> tokens(post_body, sep); AttributeMap map; for (const auto& t : tokens) { auto pos = t.find("="); if (pos != string::npos) { const auto key = t.substr(0, pos); if (key == "Action") { s->info.args.append(key, t.substr(pos + 1, t.size() - 1)); } else if (key == "Name" || key == "TopicArn") { const auto value = url_decode(t.substr(pos + 1, t.size() - 1)); s->info.args.append(key, value); } else { update_attribute_map(t, map); } } } // update the regular args with the content of the attribute map for (const auto& attr : map) { s->info.args.append(attr.second.get_key(), attr.second.get_value()); } } const auto payload_hash = rgw::auth::s3::calc_v4_payload_hash(post_body); s->info.args.append("PayloadHash", payload_hash); } } RGWOp* RGWHandler_REST_PSTopic_AWS::op_post() { rgw_topic_parse_input(); if (s->info.args.exists("Action")) { const auto action = s->info.args.get("Action"); if (action.compare("CreateTopic") == 0) return new RGWPSCreateTopic_ObjStore_AWS(); if (action.compare("DeleteTopic") == 0) return new RGWPSDeleteTopic_ObjStore_AWS; if (action.compare("ListTopics") == 0) return new RGWPSListTopics_ObjStore_AWS(); if (action.compare("GetTopic") == 0) return new RGWPSGetTopic_ObjStore_AWS(); if (action.compare("GetTopicAttributes") == 0) return new RGWPSGetTopicAttributes_ObjStore_AWS(); } return nullptr; } int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) { return RGW_Auth_S3::authorize(dpp, store, auth_registry, s, y); } namespace { // return a unique topic by prefexing with the notification name: _ std::string topic_to_unique(const std::string& topic, const std::string& notification) { return notification + "_" + topic; } // extract the topic from a unique topic of the form: _ [[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) { if (unique_topic.find(notification + "_") == string::npos) { return ""; } return unique_topic.substr(notification.length() + 1); } // from list of bucket topics, find the one that was auto-generated by a notification auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) { auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; }); return it != bucket_topics.topics.end() ? std::optional>(it->second): std::nullopt; } } // command (S3 compliant): PUT /?notification // a "notification" and a subscription will be auto-generated // actual configuration is XML encoded in the body of the message class RGWPSCreateNotif_ObjStore_S3 : public RGWPSCreateNotifOp { rgw_pubsub_s3_notifications configurations; int get_params_from_body() { const auto max_size = s->cct->_conf->rgw_max_put_param_size; int r; bufferlist data; std::tie(r, data) = rgw_rest_read_all_input(s, max_size, false); if (r < 0) { ldpp_dout(this, 1) << "failed to read XML payload" << dendl; return r; } if (data.length() == 0) { ldpp_dout(this, 1) << "XML payload missing" << dendl; return -EINVAL; } RGWXMLDecoder::XMLParser parser; if (!parser.init()){ ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl; return -EINVAL; } if (!parser.parse(data.c_str(), data.length(), 1)) { ldpp_dout(this, 1) << "failed to parse XML payload" << dendl; return -ERR_MALFORMED_XML; } try { // NotificationConfigurations is mandatory RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true); } catch (RGWXMLDecoder::err& err) { ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl; return -ERR_MALFORMED_XML; } return 0; } int get_params() override { bool exists; const auto no_value = s->info.args.get("notification", &exists); if (!exists) { ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; return -EINVAL; } if (no_value.length() > 0) { ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl; return -EINVAL; } if (s->bucket_name.empty()) { ldpp_dout(this, 1) << "request must be on a bucket" << dendl; return -EINVAL; } bucket_name = s->bucket_name; return 0; } public: const char* name() const override { return "pubsub_notification_create_s3"; } void execute(optional_yield) override; }; void RGWPSCreateNotif_ObjStore_S3::execute(optional_yield y) { op_ret = get_params_from_body(); if (op_ret < 0) { return; } ps.emplace(store, s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); std::string data_bucket_prefix = ""; std::string data_oid_prefix = ""; bool push_only = true; if (store->getRados()->get_sync_module()) { const auto psmodule = dynamic_cast(store->getRados()->get_sync_module().get()); if (psmodule) { const auto& conf = psmodule->get_effective_conf(); data_bucket_prefix = conf["data_bucket_prefix"]; data_oid_prefix = conf["data_oid_prefix"]; // TODO: allow "push-only" on PS zone as well push_only = false; } } for (const auto& c : configurations.list) { const auto& notif_name = c.id; if (notif_name.empty()) { ldpp_dout(this, 1) << "missing notification id" << dendl; op_ret = -EINVAL; return; } if (c.topic_arn.empty()) { ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl; op_ret = -EINVAL; return; } const auto arn = rgw::ARN::parse(c.topic_arn); if (!arn || arn->resource.empty()) { ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl; op_ret = -EINVAL; return; } if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) { ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl; op_ret = -EINVAL; return; } const auto topic_name = arn->resource; // get topic information. destination information is stored in the topic rgw_pubsub_topic topic_info; op_ret = ps->get_topic(topic_name, &topic_info); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl; return; } // make sure that full topic configuration match // TODO: use ARN match function // create unique topic name. this has 2 reasons: // (1) topics cannot be shared between different S3 notifications because they hold the filter information // (2) make topic clneaup easier, when notification is removed const auto unique_topic_name = topic_to_unique(topic_name, notif_name); // generate the internal topic. destination is stored here for the "push-only" case // when no subscription exists // ARN is cached to make the "GET" method faster op_ret = ps->create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name << "', ret=" << op_ret << dendl; return; } ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl; // generate the notification rgw::notify::EventTypeList events; op_ret = b->create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name << "', ret=" << op_ret << dendl; // rollback generated topic (ignore return value) ps->remove_topic(this, unique_topic_name, y); return; } ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl; if (!push_only) { // generate the subscription with destination information from the original topic rgw_pubsub_sub_dest dest = topic_info.dest; dest.bucket_name = data_bucket_prefix + s->owner.get_id().to_str() + "-" + unique_topic_name; dest.oid_prefix = data_oid_prefix + notif_name + "/"; auto sub = ps->get_sub(notif_name); op_ret = sub->subscribe(this, unique_topic_name, dest, y, notif_name); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to auto-generate subscription '" << notif_name << "', ret=" << op_ret << dendl; // rollback generated notification (ignore return value) b->remove_notification(this, unique_topic_name, y); // rollback generated topic (ignore return value) ps->remove_topic(this, unique_topic_name, y); return; } ldpp_dout(this, 20) << "successfully auto-generated subscription '" << notif_name << "'" << dendl; } } } // command (extension to S3): DELETE /bucket?notification[=] class RGWPSDeleteNotif_ObjStore_S3 : public RGWPSDeleteNotifOp { private: std::string notif_name; int get_params() override { bool exists; notif_name = s->info.args.get("notification", &exists); if (!exists) { ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; return -EINVAL; } if (s->bucket_name.empty()) { ldpp_dout(this, 1) << "request must be on a bucket" << dendl; return -EINVAL; } bucket_name = s->bucket_name; return 0; } void remove_notification_by_topic(const std::string& topic_name, const RGWPubSub::BucketRef& b, optional_yield y) { op_ret = b->remove_notification(this, topic_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl; } op_ret = ps->remove_topic(this, topic_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl; } } public: void execute(optional_yield y) override; const char* name() const override { return "pubsub_notification_delete_s3"; } }; void RGWPSDeleteNotif_ObjStore_S3::execute(optional_yield y) { op_ret = get_params(); if (op_ret < 0) { return; } ps.emplace(store, s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; op_ret = b->get_topics(&bucket_topics); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; return; } if (!notif_name.empty()) { // delete a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notif_name); if (unique_topic) { // remove the auto generated subscription according to notification name (if exist) const auto unique_topic_name = unique_topic->get().topic.name; auto sub = ps->get_sub(notif_name); op_ret = sub->unsubscribe(this, unique_topic_name, y); if (op_ret < 0 && op_ret != -ENOENT) { ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << notif_name << "', ret=" << op_ret << dendl; return; } remove_notification_by_topic(unique_topic_name, b, y); return; } // notification to be removed is not found - considered success ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl; return; } // delete all notification of on a bucket for (const auto& topic : bucket_topics.topics) { // remove the auto generated subscription of the topic (if exist) rgw_pubsub_topic_subs topic_subs; op_ret = ps->get_topic(topic.first, &topic_subs); for (const auto& topic_sub_name : topic_subs.subs) { auto sub = ps->get_sub(topic_sub_name); rgw_pubsub_sub_config sub_conf; op_ret = sub->get_conf(&sub_conf); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get subscription '" << topic_sub_name << "' info, ret=" << op_ret << dendl; return; } if (!sub_conf.s3_id.empty()) { // S3 notification, has autogenerated subscription const auto& sub_topic_name = sub_conf.topic; op_ret = sub->unsubscribe(this, sub_topic_name, y); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to remove auto-generated subscription '" << topic_sub_name << "', ret=" << op_ret << dendl; return; } } } remove_notification_by_topic(topic.first, b, y); } } // command (S3 compliant): GET /bucket?notification[=] class RGWPSListNotifs_ObjStore_S3 : public RGWPSListNotifsOp { private: std::string notif_name; rgw_pubsub_s3_notifications notifications; int get_params() override { bool exists; notif_name = s->info.args.get("notification", &exists); if (!exists) { ldpp_dout(this, 1) << "missing required param 'notification'" << dendl; return -EINVAL; } if (s->bucket_name.empty()) { ldpp_dout(this, 1) << "request must be on a bucket" << dendl; return -EINVAL; } bucket_name = s->bucket_name; return 0; } public: void execute(optional_yield y) override; void send_response() override { if (op_ret) { set_req_state_err(s, op_ret); } dump_errno(s); end_header(s, this, "application/xml"); if (op_ret < 0) { return; } notifications.dump_xml(s->formatter); rgw_flush_formatter_and_reset(s, s->formatter); } const char* name() const override { return "pubsub_notifications_get_s3"; } }; void RGWPSListNotifs_ObjStore_S3::execute(optional_yield y) { ps.emplace(store, s->owner.get_id().tenant); auto b = ps->get_bucket(bucket_info.bucket); ceph_assert(b); // get all topics on a bucket rgw_pubsub_bucket_topics bucket_topics; op_ret = b->get_topics(&bucket_topics); if (op_ret < 0) { ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << bucket_info.bucket.name << "', ret=" << op_ret << dendl; return; } if (!notif_name.empty()) { // get info of a specific notification const auto unique_topic = find_unique_topic(bucket_topics, notif_name); if (unique_topic) { notifications.list.emplace_back(unique_topic->get()); return; } op_ret = -ENOENT; ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl; return; } // loop through all topics of the bucket for (const auto& topic : bucket_topics.topics) { if (topic.second.s3_id.empty()) { // not an s3 notification continue; } notifications.list.emplace_back(topic.second); } } RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() { return new RGWPSListNotifs_ObjStore_S3(); } RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() { return new RGWPSCreateNotif_ObjStore_S3(); } RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() { return new RGWPSDeleteNotif_ObjStore_S3(); } RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() { return new RGWPSListNotifs_ObjStore_S3(); } RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() { return new RGWPSCreateNotif_ObjStore_S3(); } RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() { return new RGWPSDeleteNotif_ObjStore_S3(); }