summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_process.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_process.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_process.cc')
-rw-r--r--src/rgw/rgw_process.cc472
1 files changed, 472 insertions, 0 deletions
diff --git a/src/rgw/rgw_process.cc b/src/rgw/rgw_process.cc
new file mode 100644
index 000000000..8d20251f8
--- /dev/null
+++ b/src/rgw/rgw_process.cc
@@ -0,0 +1,472 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "common/errno.h"
+#include "common/Throttle.h"
+#include "common/WorkQueue.h"
+#include "include/scope_guard.h"
+
+#include <utility>
+#include "rgw_auth_registry.h"
+#include "rgw_dmclock_scheduler.h"
+#include "rgw_rest.h"
+#include "rgw_frontend.h"
+#include "rgw_request.h"
+#include "rgw_process.h"
+#include "rgw_loadgen.h"
+#include "rgw_client_io.h"
+#include "rgw_opa.h"
+#include "rgw_perf_counters.h"
+#include "rgw_lua.h"
+#include "rgw_lua_request.h"
+#include "rgw_tracer.h"
+#include "rgw_ratelimit.h"
+
+#include "services/svc_zone_utils.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+using namespace std;
+using rgw::dmclock::Scheduler;
+
+void RGWProcess::RGWWQ::_dump_queue()
+{
+ if (!g_conf()->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+ return;
+ }
+ deque<RGWRequest *>::iterator iter;
+ if (process->m_req_queue.empty()) {
+ dout(20) << "RGWWQ: empty" << dendl;
+ return;
+ }
+ dout(20) << "RGWWQ:" << dendl;
+ for (iter = process->m_req_queue.begin();
+ iter != process->m_req_queue.end(); ++iter) {
+ dout(20) << "req: " << hex << *iter << dec << dendl;
+ }
+} /* RGWProcess::RGWWQ::_dump_queue */
+
+auto schedule_request(Scheduler *scheduler, req_state *s, RGWOp *op)
+{
+ using rgw::dmclock::SchedulerCompleter;
+ if (!scheduler)
+ return std::make_pair(0,SchedulerCompleter{});
+
+ const auto client = op->dmclock_client();
+ const auto cost = op->dmclock_cost();
+ if (s->cct->_conf->subsys.should_gather(ceph_subsys_rgw, 10)) {
+ ldpp_dout(op,10) << "scheduling with "
+ << s->cct->_conf.get_val<std::string>("rgw_scheduler_type")
+ << " client=" << static_cast<int>(client)
+ << " cost=" << cost << dendl;
+ }
+ return scheduler->schedule_request(client, {},
+ req_state::Clock::to_double(s->time),
+ cost,
+ s->yield);
+}
+
+bool RGWProcess::RGWWQ::_enqueue(RGWRequest* req) {
+ process->m_req_queue.push_back(req);
+ perfcounter->inc(l_rgw_qlen);
+ dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return true;
+}
+
+RGWRequest* RGWProcess::RGWWQ::_dequeue() {
+ if (process->m_req_queue.empty())
+ return NULL;
+ RGWRequest *req = process->m_req_queue.front();
+ process->m_req_queue.pop_front();
+ dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ perfcounter->inc(l_rgw_qlen, -1);
+ return req;
+}
+
+void RGWProcess::RGWWQ::_process(RGWRequest *req, ThreadPool::TPHandle &) {
+ perfcounter->inc(l_rgw_qactive);
+ process->handle_request(this, req);
+ process->req_throttle.put(1);
+ perfcounter->inc(l_rgw_qactive, -1);
+}
+bool rate_limit(rgw::sal::Driver* driver, req_state* s) {
+ // we dont want to limit health check or system or admin requests
+ const auto& is_admin_or_system = s->user->get_info();
+ if ((s->op_type == RGW_OP_GET_HEALTH_CHECK) || is_admin_or_system.admin || is_admin_or_system.system)
+ return false;
+ std::string userfind;
+ RGWRateLimitInfo global_user;
+ RGWRateLimitInfo global_bucket;
+ RGWRateLimitInfo global_anon;
+ RGWRateLimitInfo* bucket_ratelimit;
+ RGWRateLimitInfo* user_ratelimit;
+ driver->get_ratelimit(global_bucket, global_user, global_anon);
+ bucket_ratelimit = &global_bucket;
+ user_ratelimit = &global_user;
+ s->user->get_id().to_str(userfind);
+ userfind = "u" + userfind;
+ s->ratelimit_user_name = userfind;
+ std::string bucketfind = !rgw::sal::Bucket::empty(s->bucket.get()) ? "b" + s->bucket->get_marker() : "";
+ s->ratelimit_bucket_marker = bucketfind;
+ const char *method = s->info.method;
+
+ auto iter = s->user->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != s->user->get_attrs().end()) {
+ try {
+ RGWRateLimitInfo user_ratelimit_temp;
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(user_ratelimit_temp, biter);
+ // override global rate limiting only if local rate limiting is enabled
+ if (user_ratelimit_temp.enabled)
+ *user_ratelimit = user_ratelimit_temp;
+ } catch (buffer::error& err) {
+ ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ if (s->user->get_id().id == RGW_USER_ANON_ID && global_anon.enabled) {
+ *user_ratelimit = global_anon;
+ }
+ bool limit_bucket = false;
+ bool limit_user = s->ratelimit_data->should_rate_limit(method, s->ratelimit_user_name, s->time, user_ratelimit);
+
+ if(!rgw::sal::Bucket::empty(s->bucket.get()))
+ {
+ iter = s->bucket->get_attrs().find(RGW_ATTR_RATELIMIT);
+ if(iter != s->bucket->get_attrs().end()) {
+ try {
+ RGWRateLimitInfo bucket_ratelimit_temp;
+ bufferlist& bl = iter->second;
+ auto biter = bl.cbegin();
+ decode(bucket_ratelimit_temp, biter);
+ // override global rate limiting only if local rate limiting is enabled
+ if (bucket_ratelimit_temp.enabled)
+ *bucket_ratelimit = bucket_ratelimit_temp;
+ } catch (buffer::error& err) {
+ ldpp_dout(s, 0) << "ERROR: failed to decode rate limit" << dendl;
+ return -EIO;
+ }
+ }
+ if (!limit_user) {
+ limit_bucket = s->ratelimit_data->should_rate_limit(method, s->ratelimit_bucket_marker, s->time, bucket_ratelimit);
+ }
+ }
+ if(limit_bucket && !limit_user) {
+ s->ratelimit_data->giveback_tokens(method, s->ratelimit_user_name);
+ }
+ s->user_ratelimit = *user_ratelimit;
+ s->bucket_ratelimit = *bucket_ratelimit;
+ return (limit_user || limit_bucket);
+}
+
+int rgw_process_authenticated(RGWHandler_REST * const handler,
+ RGWOp *& op,
+ RGWRequest * const req,
+ req_state * const s,
+ optional_yield y,
+ rgw::sal::Driver* driver,
+ const bool skip_retarget)
+{
+ ldpp_dout(op, 2) << "init permissions" << dendl;
+ int ret = handler->init_permissions(op, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ /**
+ * Only some accesses support website mode, and website mode does NOT apply
+ * if you are using the REST endpoint either (ergo, no authenticated access)
+ */
+ if (! skip_retarget) {
+ ldpp_dout(op, 2) << "recalculating target" << dendl;
+ ret = handler->retarget(op, &op, y);
+ if (ret < 0) {
+ return ret;
+ }
+ req->op = op;
+ } else {
+ ldpp_dout(op, 2) << "retargeting skipped because of SubOp mode" << dendl;
+ }
+
+ /* If necessary extract object ACL and put them into req_state. */
+ ldpp_dout(op, 2) << "reading permissions" << dendl;
+ ret = handler->read_permissions(op, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ldpp_dout(op, 2) << "init op" << dendl;
+ ret = op->init_processing(y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ldpp_dout(op, 2) << "verifying op mask" << dendl;
+ ret = op->verify_op_mask();
+ if (ret < 0) {
+ return ret;
+ }
+
+ /* Check if OPA is used to authorize requests */
+ if (s->cct->_conf->rgw_use_opa_authz) {
+ ret = rgw_opa_authorize(op, s);
+ if (ret < 0) {
+ return ret;
+ }
+ }
+
+ ldpp_dout(op, 2) << "verifying op permissions" << dendl;
+ {
+ auto span = tracing::rgw::tracer.add_span("verify_permission", s->trace);
+ std::swap(span, s->trace);
+ ret = op->verify_permission(y);
+ std::swap(span, s->trace);
+ }
+ if (ret < 0) {
+ if (s->system_request) {
+ dout(2) << "overriding permissions due to system operation" << dendl;
+ } else if (s->auth.identity->is_admin_of(s->user->get_id())) {
+ dout(2) << "overriding permissions due to admin operation" << dendl;
+ } else {
+ return ret;
+ }
+ }
+
+ ldpp_dout(op, 2) << "verifying op params" << dendl;
+ ret = op->verify_params();
+ if (ret < 0) {
+ return ret;
+ }
+
+ ldpp_dout(op, 2) << "pre-executing" << dendl;
+ op->pre_exec();
+
+ ldpp_dout(op, 2) << "check rate limiting" << dendl;
+ if (rate_limit(driver, s)) {
+ return -ERR_RATE_LIMITED;
+ }
+ ldpp_dout(op, 2) << "executing" << dendl;
+ {
+ auto span = tracing::rgw::tracer.add_span("execute", s->trace);
+ std::swap(span, s->trace);
+ op->execute(y);
+ std::swap(span, s->trace);
+ }
+
+ ldpp_dout(op, 2) << "completing" << dendl;
+ op->complete();
+
+ return 0;
+}
+
+int process_request(const RGWProcessEnv& penv,
+ RGWRequest* const req,
+ const std::string& frontend_prefix,
+ RGWRestfulIO* const client_io,
+ optional_yield yield,
+ rgw::dmclock::Scheduler *scheduler,
+ string* user,
+ ceph::coarse_real_clock::duration* latency,
+ int* http_ret)
+{
+ int ret = client_io->init(g_ceph_context);
+ dout(1) << "====== starting new request req=" << hex << req << dec
+ << " =====" << dendl;
+ perfcounter->inc(l_rgw_req);
+
+ RGWEnv& rgw_env = client_io->get_env();
+
+ req_state rstate(g_ceph_context, penv, &rgw_env, req->id);
+ req_state *s = &rstate;
+
+ s->ratelimit_data = penv.ratelimiting->get_active();
+
+ rgw::sal::Driver* driver = penv.driver;
+ std::unique_ptr<rgw::sal::User> u = driver->get_user(rgw_user());
+ s->set_user(u);
+
+ if (ret < 0) {
+ s->cio = client_io;
+ abort_early(s, nullptr, ret, nullptr, yield);
+ return ret;
+ }
+
+ s->req_id = driver->zone_unique_id(req->id);
+ s->trans_id = driver->zone_unique_trans_id(req->id);
+ s->host_id = driver->get_host_id();
+ s->yield = yield;
+
+ ldpp_dout(s, 2) << "initializing for trans_id = " << s->trans_id << dendl;
+
+ RGWOp* op = nullptr;
+ int init_error = 0;
+ bool should_log = false;
+ RGWREST* rest = penv.rest;
+ RGWRESTMgr *mgr;
+ RGWHandler_REST *handler = rest->get_handler(driver, s,
+ *penv.auth_registry,
+ frontend_prefix,
+ client_io, &mgr, &init_error);
+ rgw::dmclock::SchedulerCompleter c;
+
+ if (init_error != 0) {
+ abort_early(s, nullptr, init_error, nullptr, yield);
+ goto done;
+ }
+ ldpp_dout(s, 10) << "handler=" << typeid(*handler).name() << dendl;
+
+ should_log = mgr->get_logging();
+
+ ldpp_dout(s, 2) << "getting op " << s->op << dendl;
+ op = handler->get_op();
+ if (!op) {
+ abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler, yield);
+ goto done;
+ }
+ {
+ s->trace_enabled = tracing::rgw::tracer.is_enabled();
+ std::string script;
+ auto rc = rgw::lua::read_script(s, penv.lua.manager.get(), s->bucket_tenant, s->yield, rgw::lua::context::preRequest, script);
+ if (rc == -ENOENT) {
+ // no script, nothing to do
+ } else if (rc < 0) {
+ ldpp_dout(op, 5) << "WARNING: failed to read pre request script. error: " << rc << dendl;
+ } else {
+ rc = rgw::lua::request::execute(driver, rest, penv.olog, s, op, script);
+ if (rc < 0) {
+ ldpp_dout(op, 5) << "WARNING: failed to execute pre request script. error: " << rc << dendl;
+ }
+ }
+ }
+ std::tie(ret,c) = schedule_request(scheduler, s, op);
+ if (ret < 0) {
+ if (ret == -EAGAIN) {
+ ret = -ERR_RATE_LIMITED;
+ }
+ ldpp_dout(op,0) << "Scheduling request failed with " << ret << dendl;
+ abort_early(s, op, ret, handler, yield);
+ goto done;
+ }
+ req->op = op;
+ ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;
+ s->op_type = op->get_type();
+
+ try {
+ ldpp_dout(op, 2) << "verifying requester" << dendl;
+ ret = op->verify_requester(*penv.auth_registry, yield);
+ if (ret < 0) {
+ dout(10) << "failed to authorize request" << dendl;
+ abort_early(s, op, ret, handler, yield);
+ goto done;
+ }
+
+ /* FIXME: remove this after switching all handlers to the new authentication
+ * infrastructure. */
+ if (nullptr == s->auth.identity) {
+ s->auth.identity = rgw::auth::transform_old_authinfo(s);
+ }
+
+ ldpp_dout(op, 2) << "normalizing buckets and tenants" << dendl;
+ ret = handler->postauth_init(yield);
+ if (ret < 0) {
+ dout(10) << "failed to run post-auth init" << dendl;
+ abort_early(s, op, ret, handler, yield);
+ goto done;
+ }
+
+ if (s->user->get_info().suspended) {
+ dout(10) << "user is suspended, uid=" << s->user->get_id() << dendl;
+ abort_early(s, op, -ERR_USER_SUSPENDED, handler, yield);
+ goto done;
+ }
+
+
+ const auto trace_name = std::string(op->name()) + " " + s->trans_id;
+ s->trace = tracing::rgw::tracer.start_trace(trace_name, s->trace_enabled);
+ s->trace->SetAttribute(tracing::rgw::OP, op->name());
+ s->trace->SetAttribute(tracing::rgw::TYPE, tracing::rgw::REQUEST);
+
+ ret = rgw_process_authenticated(handler, op, req, s, yield, driver);
+ if (ret < 0) {
+ abort_early(s, op, ret, handler, yield);
+ goto done;
+ }
+ } catch (const ceph::crypto::DigestException& e) {
+ dout(0) << "authentication failed" << e.what() << dendl;
+ abort_early(s, op, -ERR_INVALID_SECRET_KEY, handler, yield);
+ }
+
+done:
+ if (op) {
+ if (s->trace) {
+ s->trace->SetAttribute(tracing::rgw::RETURN, op->get_ret());
+ if (!rgw::sal::User::empty(s->user)) {
+ s->trace->SetAttribute(tracing::rgw::USER_ID, s->user->get_id().id);
+ }
+ if (!rgw::sal::Bucket::empty(s->bucket)) {
+ s->trace->SetAttribute(tracing::rgw::BUCKET_NAME, s->bucket->get_name());
+ }
+ if (!rgw::sal::Object::empty(s->object)) {
+ s->trace->SetAttribute(tracing::rgw::OBJECT_NAME, s->object->get_name());
+ }
+ }
+ std::string script;
+ auto rc = rgw::lua::read_script(s, penv.lua.manager.get(), s->bucket_tenant, s->yield, rgw::lua::context::postRequest, script);
+ if (rc == -ENOENT) {
+ // no script, nothing to do
+ } else if (rc < 0) {
+ ldpp_dout(op, 5) << "WARNING: failed to read post request script. error: " << rc << dendl;
+ } else {
+ rc = rgw::lua::request::execute(driver, rest, penv.olog, s, op, script);
+ if (rc < 0) {
+ ldpp_dout(op, 5) << "WARNING: failed to execute post request script. error: " << rc << dendl;
+ }
+ }
+ }
+
+ try {
+ client_io->complete_request();
+ } catch (rgw::io::Exception& e) {
+ dout(0) << "ERROR: client_io->complete_request() returned "
+ << e.what() << dendl;
+ }
+ if (should_log) {
+ rgw_log_op(rest, s, op, penv.olog);
+ }
+
+ if (http_ret != nullptr) {
+ *http_ret = s->err.http_ret;
+ }
+ int op_ret = 0;
+
+ if (user && !rgw::sal::User::empty(s->user.get())) {
+ *user = s->user->get_id().to_str();
+ }
+
+ if (op) {
+ op_ret = op->get_ret();
+ ldpp_dout(op, 2) << "op status=" << op_ret << dendl;
+ ldpp_dout(op, 2) << "http status=" << s->err.http_ret << dendl;
+ } else {
+ ldpp_dout(s, 2) << "http status=" << s->err.http_ret << dendl;
+ }
+ if (handler)
+ handler->put_op(op);
+ rest->put_handler(handler);
+
+ const auto lat = s->time_elapsed();
+ if (latency) {
+ *latency = lat;
+ }
+ dout(1) << "====== req done req=" << hex << req << dec
+ << " op status=" << op_ret
+ << " http_status=" << s->err.http_ret
+ << " latency=" << lat
+ << " ======"
+ << dendl;
+
+ return (ret < 0 ? ret : s->err.ret);
+} /* process_request */