summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_rest_pubsub.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_rest_pubsub.cc')
-rw-r--r--src/rgw/rgw_rest_pubsub.cc954
1 files changed, 954 insertions, 0 deletions
diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc
new file mode 100644
index 000000000..793232866
--- /dev/null
+++ b/src/rgw/rgw_rest_pubsub.cc
@@ -0,0 +1,954 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include <boost/tokenizer.hpp>
+#include <optional>
+#include "rgw_rest_pubsub.h"
+#include "rgw_pubsub_push.h"
+#include "rgw_pubsub.h"
+#include "rgw_op.h"
+#include "rgw_rest.h"
+#include "rgw_rest_s3.h"
+#include "rgw_arn.h"
+#include "rgw_auth_s3.h"
+#include "rgw_notify.h"
+#include "services/svc_zone.h"
+#include "common/dout.h"
+#include "rgw_url.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+static const char* AWS_SNS_NS("https://sns.amazonaws.com/doc/2010-03-31/");
+
+bool verify_transport_security(CephContext *cct, const RGWEnv& env) {
+ const auto is_secure = rgw_transport_is_secure(cct, env);
+ if (!is_secure && g_conf().get_val<bool>("rgw_allow_notification_secrets_in_cleartext")) {
+ ldout(cct, 0) << "WARNING: bypassing endpoint validation, allows sending secrets over insecure transport" << dendl;
+ return true;
+ }
+ return is_secure;
+}
+
+// make sure that endpoint is a valid URL
+// make sure that if user/password are passed inside URL, it is over secure connection
+// update rgw_pubsub_dest to indicate that a password is stored in the URL
+bool validate_and_update_endpoint_secret(rgw_pubsub_dest& dest, CephContext *cct, const RGWEnv& env) {
+ if (dest.push_endpoint.empty()) {
+ return true;
+ }
+ std::string user;
+ std::string password;
+ if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+ ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+ return false;
+ }
+ // this should be verified inside parse_url()
+ ceph_assert(user.empty() == password.empty());
+ if (!user.empty()) {
+ dest.stored_secret = true;
+ if (!verify_transport_security(cct, env)) {
+ ldout(cct, 1) << "endpoint validation error: sending secrets over insecure transport" << dendl;
+ return false;
+ }
+ }
+ return true;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic& topic) {
+ return topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_topics& topics) {
+ for (const auto& topic : topics.topics) {
+ if (topic_has_endpoint_secret(topic.second)) return true;
+ }
+ return false;
+}
+
+// command (AWS compliant):
+// POST
+// Action=CreateTopic&Name=<topic-name>[&OpaqueData=data][&push-endpoint=<endpoint>[&persistent][&<arg1>=<value1>]]
+class RGWPSCreateTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_dest dest;
+ std::string topic_arn;
+ std::string opaque_data;
+
+ int get_params() {
+ topic_name = s->info.args.get("Name");
+ if (topic_name.empty()) {
+ ldpp_dout(this, 1) << "CreateTopic Action 'Name' argument is missing" << dendl;
+ return -EINVAL;
+ }
+
+ opaque_data = s->info.args.get("OpaqueData");
+
+ dest.push_endpoint = s->info.args.get("push-endpoint");
+ s->info.args.get_bool("persistent", &dest.persistent, false);
+
+ if (!validate_and_update_endpoint_secret(dest, s->cct, *(s->info.env))) {
+ return -EINVAL;
+ }
+ for (const auto& param : s->info.args.get_params()) {
+ if (param.first == "Action" || param.first == "Name" || param.first == "PayloadHash") {
+ continue;
+ }
+ dest.push_endpoint_args.append(param.first+"="+param.second+"&");
+ }
+
+ if (!dest.push_endpoint_args.empty()) {
+ // remove last separator
+ dest.push_endpoint_args.pop_back();
+ }
+ if (!dest.push_endpoint.empty() && dest.persistent) {
+ const auto ret = rgw::notify::add_persistent_topic(topic_name, s->yield);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "CreateTopic Action failed to create queue for persistent topics. error:" << ret << dendl;
+ return ret;
+ }
+ }
+
+ // dest object only stores endpoint info
+ dest.arn_topic = topic_name;
+ // the topic ARN will be sent in the reply
+ const rgw::ARN arn(rgw::Partition::aws, rgw::Service::sns,
+ driver->get_zone()->get_zonegroup().get_name(),
+ s->user->get_tenant(), topic_name);
+ topic_arn = arn.to_string();
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topic_create"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("CreateTopicResponse", AWS_SNS_NS);
+ f->open_object_section("CreateTopicResult");
+ encode_xml("TopicArn", topic_arn, f);
+ f->close_section(); // CreateTopicResult
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section(); // ResponseMetadata
+ f->close_section(); // CreateTopicResponse
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSCreateTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.create_topic(this, topic_name, dest, topic_arn, opaque_data, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
+// command (AWS compliant):
+// POST
+// Action=ListTopics
+class RGWPSListTopicsOp : public RGWOp {
+private:
+ rgw_pubsub_topics result;
+
+public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield) override;
+
+ const char* name() const override { return "pubsub_topics_list"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPICS_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("ListTopicsResponse", AWS_SNS_NS);
+ f->open_object_section("ListTopicsResult");
+ encode_xml("Topics", result, f);
+ f->close_section(); // ListTopicsResult
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section(); // ResponseMetadat
+ f->close_section(); // ListTopicsResponse
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSListTopicsOp::execute(optional_yield y) {
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topics(this, result, y);
+ // if there are no topics it is not considered an error
+ op_ret = op_ret == -ENOENT ? 0 : op_ret;
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+ if (topics_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topics contain secrets and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully got topics" << dendl;
+}
+
+// command (extension to AWS):
+// POST
+// Action=GetTopic&TopicArn=<topic-arn>
+class RGWPSGetTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_topic result;
+
+ int get_params() {
+ const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!topic_arn || topic_arn->resource.empty()) {
+ ldpp_dout(this, 1) << "GetTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
+ }
+
+ topic_name = topic_arn->resource;
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section("GetTopicResponse");
+ f->open_object_section("GetTopicResult");
+ encode_xml("Topic", result, f);
+ f->close_section();
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section();
+ f->close_section();
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSGetTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topic(this, topic_name, result, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
+// command (AWS compliant):
+// POST
+// Action=GetTopicAttributes&TopicArn=<topic-arn>
+class RGWPSGetTopicAttributesOp : public RGWOp {
+ private:
+ std::string topic_name;
+ rgw_pubsub_topic result;
+
+ int get_params() {
+ const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!topic_arn || topic_arn->resource.empty()) {
+ ldpp_dout(this, 1) << "GetTopicAttribute Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
+ }
+
+ topic_name = topic_arn->resource;
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield y) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_get"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_GET; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("GetTopicAttributesResponse", AWS_SNS_NS);
+ f->open_object_section("GetTopicAttributesResult");
+ result.dump_xml_as_attributes(f);
+ f->close_section(); // GetTopicAttributesResult
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section(); // ResponseMetadata
+ f->close_section(); // GetTopicAttributesResponse
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.get_topic(this, topic_name, result, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (topic_has_endpoint_secret(result) && !verify_transport_security(s->cct, *(s->info.env))) {
+ ldpp_dout(this, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
+// command (AWS compliant):
+// POST
+// Action=DeleteTopic&TopicArn=<topic-arn>
+class RGWPSDeleteTopicOp : public RGWOp {
+ private:
+ std::string topic_name;
+
+ int get_params() {
+ const auto topic_arn = rgw::ARN::parse((s->info.args.get("TopicArn")));
+
+ if (!topic_arn || topic_arn->resource.empty()) {
+ ldpp_dout(this, 1) << "DeleteTopic Action 'TopicArn' argument is missing or invalid" << dendl;
+ return -EINVAL;
+ }
+
+ topic_name = topic_arn->resource;
+
+ // upon deletion it is not known if topic is persistent or not
+ // will try to delete the persistent topic anyway
+ const auto ret = rgw::notify::remove_persistent_topic(topic_name, s->yield);
+ if (ret == -ENOENT) {
+ // topic was not persistent, or already deleted
+ return 0;
+ }
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "DeleteTopic Action failed to remove queue for persistent topics. error:" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+ }
+
+ public:
+ int verify_permission(optional_yield) override {
+ return 0;
+ }
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+ void execute(optional_yield y) override;
+
+ const char* name() const override { return "pubsub_topic_delete"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_TOPIC_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+
+ const auto f = s->formatter;
+ f->open_object_section_in_ns("DeleteTopicResponse", AWS_SNS_NS);
+ f->open_object_section("ResponseMetadata");
+ encode_xml("RequestId", s->req_id, f);
+ f->close_section(); // ResponseMetadata
+ f->close_section(); // DeleteTopicResponse
+ rgw_flush_formatter_and_reset(s, f);
+ }
+};
+
+void RGWPSDeleteTopicOp::execute(optional_yield y) {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ op_ret = ps.remove_topic(this, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
+using op_generator = RGWOp*(*)();
+static const std::unordered_map<std::string, op_generator> op_generators = {
+ {"CreateTopic", []() -> RGWOp* {return new RGWPSCreateTopicOp;}},
+ {"DeleteTopic", []() -> RGWOp* {return new RGWPSDeleteTopicOp;}},
+ {"ListTopics", []() -> RGWOp* {return new RGWPSListTopicsOp;}},
+ {"GetTopic", []() -> RGWOp* {return new RGWPSGetTopicOp;}},
+ {"GetTopicAttributes", []() -> RGWOp* {return new RGWPSGetTopicAttributesOp;}}
+};
+
+bool RGWHandler_REST_PSTopic_AWS::action_exists(const req_state* s)
+{
+ if (s->info.args.exists("Action")) {
+ const std::string action_name = s->info.args.get("Action");
+ return op_generators.contains(action_name);
+ }
+ return false;
+}
+
+RGWOp *RGWHandler_REST_PSTopic_AWS::op_post()
+{
+ s->dialect = "sns";
+ s->prot_flags = RGW_REST_STS;
+
+ if (s->info.args.exists("Action")) {
+ const std::string action_name = s->info.args.get("Action");
+ const auto action_it = op_generators.find(action_name);
+ if (action_it != op_generators.end()) {
+ return action_it->second();
+ }
+ ldpp_dout(s, 10) << "unknown action '" << action_name << "' for Topic handler" << dendl;
+ } else {
+ ldpp_dout(s, 10) << "missing action argument in Topic handler" << dendl;
+ }
+ return nullptr;
+}
+
+int RGWHandler_REST_PSTopic_AWS::authorize(const DoutPrefixProvider* dpp, optional_yield y) {
+ const auto rc = RGW_Auth_S3::authorize(dpp, driver, auth_registry, s, y);
+ if (rc < 0) {
+ return rc;
+ }
+ if (s->auth.identity->is_anonymous()) {
+ ldpp_dout(dpp, 1) << "anonymous user not allowed in topic operations" << dendl;
+ return -ERR_INVALID_REQUEST;
+ }
+ return 0;
+}
+
+namespace {
+// return a unique topic by prefexing with the notification name: <notification>_<topic>
+std::string topic_to_unique(const std::string& topic, const std::string& notification) {
+ return notification + "_" + topic;
+}
+
+// extract the topic from a unique topic of the form: <notification>_<topic>
+[[maybe_unused]] std::string unique_to_topic(const std::string& unique_topic, const std::string& notification) {
+ if (unique_topic.find(notification + "_") == std::string::npos) {
+ return "";
+ }
+ return unique_topic.substr(notification.length() + 1);
+}
+
+// from list of bucket topics, find the one that was auto-generated by a notification
+auto find_unique_topic(const rgw_pubsub_bucket_topics& bucket_topics, const std::string& notif_name) {
+ auto it = std::find_if(bucket_topics.topics.begin(), bucket_topics.topics.end(), [&](const auto& val) { return notif_name == val.second.s3_id; });
+ return it != bucket_topics.topics.end() ?
+ std::optional<std::reference_wrapper<const rgw_pubsub_topic_filter>>(it->second):
+ std::nullopt;
+}
+}
+
+int remove_notification_by_topic(const DoutPrefixProvider *dpp, const std::string& topic_name, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
+ int op_ret = b.remove_notification(dpp, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to remove notification of topic '" << topic_name << "', ret=" << op_ret << dendl;
+ }
+ op_ret = ps.remove_topic(dpp, topic_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(dpp, 1) << "failed to remove auto-generated topic '" << topic_name << "', ret=" << op_ret << dendl;
+ }
+ return op_ret;
+}
+
+int delete_all_notifications(const DoutPrefixProvider *dpp, const rgw_pubsub_bucket_topics& bucket_topics, const RGWPubSub::Bucket& b, optional_yield y, const RGWPubSub& ps) {
+ // delete all notifications of on a bucket
+ for (const auto& topic : bucket_topics.topics) {
+ const auto op_ret = remove_notification_by_topic(dpp, topic.first, b, y, ps);
+ if (op_ret < 0) {
+ return op_ret;
+ }
+ }
+ return 0;
+}
+
+// command (S3 compliant): PUT /<bucket name>?notification
+// a "notification" and a subscription will be auto-generated
+// actual configuration is XML encoded in the body of the message
+class RGWPSCreateNotifOp : public RGWDefaultResponseOp {
+ int verify_params() override {
+ bool exists;
+ const auto no_value = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (no_value.length() > 0) {
+ ldpp_dout(this, 1) << "param 'notification' should not have any value" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+ int get_params_from_body(rgw_pubsub_s3_notifications& configurations) {
+ const auto max_size = s->cct->_conf->rgw_max_put_param_size;
+ int r;
+ bufferlist data;
+ std::tie(r, data) = read_all_input(s, max_size, false);
+
+ if (r < 0) {
+ ldpp_dout(this, 1) << "failed to read XML payload" << dendl;
+ return r;
+ }
+ if (data.length() == 0) {
+ ldpp_dout(this, 1) << "XML payload missing" << dendl;
+ return -EINVAL;
+ }
+
+ RGWXMLDecoder::XMLParser parser;
+
+ if (!parser.init()){
+ ldpp_dout(this, 1) << "failed to initialize XML parser" << dendl;
+ return -EINVAL;
+ }
+ if (!parser.parse(data.c_str(), data.length(), 1)) {
+ ldpp_dout(this, 1) << "failed to parse XML payload" << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ try {
+ // NotificationConfigurations is mandatory
+ // It can be empty which means we delete all the notifications
+ RGWXMLDecoder::decode_xml("NotificationConfiguration", configurations, &parser, true);
+ } catch (RGWXMLDecoder::err& err) {
+ ldpp_dout(this, 1) << "failed to parse XML payload. error: " << err << dendl;
+ return -ERR_MALFORMED_XML;
+ }
+ return 0;
+ }
+public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_create_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_CREATE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_WRITE; }
+
+
+ void execute(optional_yield) override;
+};
+
+void RGWPSCreateNotifOp::execute(optional_yield y) {
+ op_ret = verify_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ rgw_pubsub_s3_notifications configurations;
+ op_ret = get_params_from_body(configurations);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" <<
+ (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
+ "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+
+ if(configurations.list.empty()) {
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b.get_topics(this, bucket_topics, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
+ return;
+ }
+
+ for (const auto& c : configurations.list) {
+ const auto& notif_name = c.id;
+ if (notif_name.empty()) {
+ ldpp_dout(this, 1) << "missing notification id" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ if (c.topic_arn.empty()) {
+ ldpp_dout(this, 1) << "missing topic ARN in notification: '" << notif_name << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ const auto arn = rgw::ARN::parse(c.topic_arn);
+ if (!arn || arn->resource.empty()) {
+ ldpp_dout(this, 1) << "topic ARN has invalid format: '" << c.topic_arn << "' in notification: '" << notif_name << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ if (std::find(c.events.begin(), c.events.end(), rgw::notify::UnknownEvent) != c.events.end()) {
+ ldpp_dout(this, 1) << "unknown event type in notification: '" << notif_name << "'" << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ const auto topic_name = arn->resource;
+
+ // get topic information. destination information is stored in the topic
+ rgw_pubsub_topic topic_info;
+ op_ret = ps.get_topic(this, topic_name, topic_info, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ // make sure that full topic configuration match
+ // TODO: use ARN match function
+
+ // create unique topic name. this has 2 reasons:
+ // (1) topics cannot be shared between different S3 notifications because they hold the filter information
+ // (2) make topic clneaup easier, when notification is removed
+ const auto unique_topic_name = topic_to_unique(topic_name, notif_name);
+ // generate the internal topic. destination is stored here for the "push-only" case
+ // when no subscription exists
+ // ARN is cached to make the "GET" method faster
+ op_ret = ps.create_topic(this, unique_topic_name, topic_info.dest, topic_info.arn, topic_info.opaque_data, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to auto-generate unique topic '" << unique_topic_name <<
+ "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully auto-generated unique topic '" << unique_topic_name << "'" << dendl;
+ // generate the notification
+ rgw::notify::EventTypeList events;
+ op_ret = b.create_notification(this, unique_topic_name, c.events, std::make_optional(c.filter), notif_name, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to auto-generate notification for unique topic '" << unique_topic_name <<
+ "', ret=" << op_ret << dendl;
+ // rollback generated topic (ignore return value)
+ ps.remove_topic(this, unique_topic_name, y);
+ return;
+ }
+ ldpp_dout(this, 20) << "successfully auto-generated notification for unique topic '" << unique_topic_name << "'" << dendl;
+ }
+}
+
+int RGWPSCreateNotifOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+// command (extension to S3): DELETE /bucket?notification[=<notification-id>]
+class RGWPSDeleteNotifOp : public RGWDefaultResponseOp {
+ int get_params(std::string& notif_name) const {
+ bool exists;
+ notif_name = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notification_delete_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_DELETE; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_DELETE; }
+
+ void execute(optional_yield y) override;
+};
+
+void RGWPSDeleteNotifOp::execute(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" <<
+ (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
+ "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b.get_topics(this, bucket_topics, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+
+ if (!notif_name.empty()) {
+ // delete a specific notification
+ const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+ if (unique_topic) {
+ const auto unique_topic_name = unique_topic->get().topic.name;
+ op_ret = remove_notification_by_topic(this, unique_topic_name, b, y, ps);
+ return;
+ }
+ // notification to be removed is not found - considered success
+ ldpp_dout(this, 20) << "notification '" << notif_name << "' already removed" << dendl;
+ return;
+ }
+
+ op_ret = delete_all_notifications(this, bucket_topics, b, y, ps);
+}
+
+int RGWPSDeleteNotifOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3PutBucketNotification)) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+// command (S3 compliant): GET /bucket?notification[=<notification-id>]
+class RGWPSListNotifsOp : public RGWOp {
+ rgw_pubsub_s3_notifications notifications;
+
+ int get_params(std::string& notif_name) const {
+ bool exists;
+ notif_name = s->info.args.get("notification", &exists);
+ if (!exists) {
+ ldpp_dout(this, 1) << "missing required param 'notification'" << dendl;
+ return -EINVAL;
+ }
+ if (s->bucket_name.empty()) {
+ ldpp_dout(this, 1) << "request must be on a bucket" << dendl;
+ return -EINVAL;
+ }
+ return 0;
+ }
+
+public:
+ int verify_permission(optional_yield y) override;
+
+ void pre_exec() override {
+ rgw_bucket_object_pre_exec(s);
+ }
+
+ const char* name() const override { return "pubsub_notifications_get_s3"; }
+ RGWOpType get_type() override { return RGW_OP_PUBSUB_NOTIF_LIST; }
+ uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
+
+ void execute(optional_yield y) override;
+ void send_response() override {
+ if (op_ret) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ end_header(s, this, "application/xml");
+
+ if (op_ret < 0) {
+ return;
+ }
+ notifications.dump_xml(s->formatter);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+};
+
+void RGWPSListNotifsOp::execute(optional_yield y) {
+ std::string notif_name;
+ op_ret = get_params(notif_name);
+ if (op_ret < 0) {
+ return;
+ }
+
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(s->owner.get_id());
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ op_ret = driver->get_bucket(this, user.get(), s->bucket_tenant, s->bucket_name, &bucket, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get bucket '" <<
+ (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) <<
+ "' info, ret = " << op_ret << dendl;
+ return;
+ }
+
+ const RGWPubSub ps(driver, s->owner.get_id().tenant);
+ const RGWPubSub::Bucket b(ps, bucket.get());
+
+ // get all topics on a bucket
+ rgw_pubsub_bucket_topics bucket_topics;
+ op_ret = b.get_topics(this, bucket_topics, y);
+ if (op_ret < 0) {
+ ldpp_dout(this, 1) << "failed to get list of topics from bucket '" << s->bucket_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ if (!notif_name.empty()) {
+ // get info of a specific notification
+ const auto unique_topic = find_unique_topic(bucket_topics, notif_name);
+ if (unique_topic) {
+ notifications.list.emplace_back(unique_topic->get());
+ return;
+ }
+ op_ret = -ENOENT;
+ ldpp_dout(this, 1) << "failed to get notification info for '" << notif_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ // loop through all topics of the bucket
+ for (const auto& topic : bucket_topics.topics) {
+ if (topic.second.s3_id.empty()) {
+ // not an s3 notification
+ continue;
+ }
+ notifications.list.emplace_back(topic.second);
+ }
+}
+
+int RGWPSListNotifsOp::verify_permission(optional_yield y) {
+ if (!verify_bucket_permission(this, s, rgw::IAM::s3GetBucketNotification)) {
+ return -EACCES;
+ }
+
+ return 0;
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_get() {
+ return new RGWPSListNotifsOp();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_put() {
+ return new RGWPSCreateNotifOp();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::op_delete() {
+ return new RGWPSDeleteNotifOp();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_get_op() {
+ return new RGWPSListNotifsOp();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_put_op() {
+ return new RGWPSCreateNotifOp();
+}
+
+RGWOp* RGWHandler_REST_PSNotifs_S3::create_delete_op() {
+ return new RGWPSDeleteNotifOp();
+}
+