summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_rest_pubsub_common.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/rgw/rgw_rest_pubsub_common.cc
parentInitial commit. (diff)
downloadceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.tar.xz
ceph-483eb2f56657e8e7f419ab1a4fab8dce9ade8609.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_rest_pubsub_common.cc')
-rw-r--r--src/rgw/rgw_rest_pubsub_common.cc259
1 files changed, 259 insertions, 0 deletions
diff --git a/src/rgw/rgw_rest_pubsub_common.cc b/src/rgw/rgw_rest_pubsub_common.cc
new file mode 100644
index 00000000..3b5de53f
--- /dev/null
+++ b/src/rgw/rgw_rest_pubsub_common.cc
@@ -0,0 +1,259 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "rgw_common.h"
+#include "rgw_rest_pubsub_common.h"
+#include "common/dout.h"
+#include "rgw_url.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_rgw
+
+bool validate_and_update_endpoint_secret(rgw_pubsub_sub_dest& dest, CephContext *cct, const RGWEnv& env) {
+ if (dest.push_endpoint.empty()) {
+ return true;
+ }
+ std::string user;
+ std::string password;
+ if (!rgw::parse_url_userinfo(dest.push_endpoint, user, password)) {
+ ldout(cct, 1) << "endpoint validation error: malformed endpoint URL:" << dest.push_endpoint << dendl;
+ return false;
+ }
+ // this should be verified inside parse_url()
+ ceph_assert(user.empty() == password.empty());
+ if (!user.empty()) {
+ dest.stored_secret = true;
+ if (!rgw_transport_is_secure(cct, env)) {
+ ldout(cct, 1) << "endpoint validation error: sending password over insecure transport" << dendl;
+ return false;
+ }
+ }
+ return true;
+}
+
+bool subscription_has_endpoint_secret(const rgw_pubsub_sub_config& sub) {
+ return sub.dest.stored_secret;
+}
+
+bool topic_has_endpoint_secret(const rgw_pubsub_topic_subs& topic) {
+ return topic.topic.dest.stored_secret;
+}
+
+bool topics_has_endpoint_secret(const rgw_pubsub_user_topics& topics) {
+ for (const auto& topic : topics.topics) {
+ if (topic_has_endpoint_secret(topic.second)) return true;
+ }
+ return false;
+}
+void RGWPSCreateTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->create_topic(topic_name, dest, topic_arn, opaque_data);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to create topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully created topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSListTopicsOp::execute() {
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->get_user_topics(&result);
+ // if there are no topics it is not considered an error
+ op_ret = op_ret == -ENOENT ? 0 : op_ret;
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topics, ret=" << op_ret << dendl;
+ return;
+ }
+ if (topics_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "topics contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got topics" << dendl;
+}
+
+void RGWPSGetTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->get_topic(topic_name, &result);
+ if (topic_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "topic '" << topic_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get topic '" << topic_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 1) << "successfully got topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSDeleteTopicOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ op_ret = ups->remove_topic(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove topic '" << topic_name << ", ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 1) << "successfully removed topic '" << topic_name << "'" << dendl;
+}
+
+void RGWPSCreateSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->subscribe(topic_name, dest);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to create subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully created subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSGetSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->get_conf(&result);
+ if (subscription_has_endpoint_secret(result) && !rgw_transport_is_secure(s->cct, *(s->info.env))) {
+ ldout(s->cct, 1) << "subscription '" << sub_name << "' contain secret and cannot be sent over insecure transport" << dendl;
+ op_ret = -EPERM;
+ return;
+ }
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSDeleteSubOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub(sub_name);
+ op_ret = sub->unsubscribe(topic_name);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to remove subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully removed subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSAckSubEventOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ auto sub = ups->get_sub_with_events(sub_name);
+ op_ret = sub->remove_event(event_id);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to ack event on subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully acked event on subscription '" << sub_name << "'" << dendl;
+}
+
+void RGWPSPullSubEventsOp::execute() {
+ op_ret = get_params();
+ if (op_ret < 0) {
+ return;
+ }
+ ups.emplace(store, s->owner.get_id());
+ sub = ups->get_sub_with_events(sub_name);
+ if (!sub) {
+ op_ret = -ENOENT;
+ ldout(s->cct, 1) << "failed to get subscription '" << sub_name << "' for events, ret=" << op_ret << dendl;
+ return;
+ }
+ op_ret = sub->list_events(marker, max_entries);
+ if (op_ret < 0) {
+ ldout(s->cct, 1) << "failed to get events from subscription '" << sub_name << "', ret=" << op_ret << dendl;
+ return;
+ }
+ ldout(s->cct, 20) << "successfully got events from subscription '" << sub_name << "'" << dendl;
+}
+
+
+int RGWPSCreateNotifOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ const auto& id = s->owner.get_id();
+
+ ret = store->get_bucket_info(*s->sysobj_ctx, id.tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ ldout(s->cct, 1) << "failed to get bucket info, cannot verify ownership" << dendl;
+ return ret;
+ }
+
+ if (bucket_info.owner != id) {
+ ldout(s->cct, 1) << "user doesn't own bucket, not allowed to create notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
+int RGWPSDeleteNotifOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot remove notification" << dendl;
+ return -EPERM;
+ }
+ return 0;
+}
+
+int RGWPSListNotifsOp::verify_permission() {
+ int ret = get_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = store->get_bucket_info(*s->sysobj_ctx, s->owner.get_id().tenant, bucket_name,
+ bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (bucket_info.owner != s->owner.get_id()) {
+ ldout(s->cct, 1) << "user doesn't own bucket, cannot get notification list" << dendl;
+ return -EPERM;
+ }
+
+ return 0;
+}
+