summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_dmclock_async_scheduler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_dmclock_async_scheduler.cc')
-rw-r--r--src/rgw/rgw_dmclock_async_scheduler.cc183
1 files changed, 183 insertions, 0 deletions
diff --git a/src/rgw/rgw_dmclock_async_scheduler.cc b/src/rgw/rgw_dmclock_async_scheduler.cc
new file mode 100644
index 000000000..28738e9f3
--- /dev/null
+++ b/src/rgw/rgw_dmclock_async_scheduler.cc
@@ -0,0 +1,183 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "common/async/completion.h"
+#include "rgw_dmclock_async_scheduler.h"
+#include "rgw_dmclock_scheduler.h"
+
+namespace rgw::dmclock {
+
+AsyncScheduler::~AsyncScheduler()
+{
+ cancel();
+ if (observer) {
+ cct->_conf.remove_observer(this);
+ }
+}
+
+const char** AsyncScheduler::get_tracked_conf_keys() const
+{
+ if (observer) {
+ return observer->get_tracked_conf_keys();
+ }
+ static const char* keys[] = { "rgw_max_concurrent_requests", nullptr };
+ return keys;
+}
+
+void AsyncScheduler::handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string>& changed)
+{
+ if (observer) {
+ observer->handle_conf_change(conf, changed);
+ }
+ if (changed.count("rgw_max_concurrent_requests")) {
+ auto new_max = conf.get_val<int64_t>("rgw_max_concurrent_requests");
+ max_requests = new_max > 0 ? new_max : std::numeric_limits<int64_t>::max();
+ }
+ queue.update_client_infos();
+ schedule(crimson::dmclock::TimeZero);
+}
+
+int AsyncScheduler::schedule_request_impl(const client_id& client,
+ const ReqParams& params,
+ const Time& time, const Cost& cost,
+ optional_yield yield_ctx)
+{
+ ceph_assert(yield_ctx);
+
+ auto &yield = yield_ctx.get_yield_context();
+ boost::system::error_code ec;
+ async_request(client, params, time, cost, yield[ec]);
+
+ if (ec){
+ if (ec == boost::system::errc::resource_unavailable_try_again)
+ return -EAGAIN;
+ else
+ return -ec.value();
+ }
+
+ return 0;
+}
+
+void AsyncScheduler::request_complete()
+{
+ --outstanding_requests;
+ if(auto c = counters(client_id::count)){
+ c->inc(throttle_counters::l_outstanding, -1);
+ }
+ schedule(crimson::dmclock::TimeZero);
+}
+
+void AsyncScheduler::cancel()
+{
+ ClientSums sums;
+
+ queue.remove_by_req_filter([&] (RequestRef&& request) {
+ inc(sums, request->client, request->cost);
+ auto c = static_cast<Completion*>(request.release());
+ Completion::dispatch(std::unique_ptr<Completion>{c},
+ boost::asio::error::operation_aborted,
+ PhaseType::priority);
+ return true;
+ });
+ timer.cancel();
+
+ for (size_t i = 0; i < client_count; i++) {
+ if (auto c = counters(static_cast<client_id>(i))) {
+ on_cancel(c, sums[i]);
+ }
+ }
+}
+
+void AsyncScheduler::cancel(const client_id& client)
+{
+ ClientSum sum;
+
+ queue.remove_by_client(client, false, [&] (RequestRef&& request) {
+ sum.count++;
+ sum.cost += request->cost;
+ auto c = static_cast<Completion*>(request.release());
+ Completion::dispatch(std::unique_ptr<Completion>{c},
+ boost::asio::error::operation_aborted,
+ PhaseType::priority);
+ });
+ if (auto c = counters(client)) {
+ on_cancel(c, sum);
+ }
+ schedule(crimson::dmclock::TimeZero);
+}
+
+void AsyncScheduler::schedule(const Time& time)
+{
+ timer.expires_at(Clock::from_double(time));
+ timer.async_wait([this] (boost::system::error_code ec) {
+ // process requests unless the wait was canceled. note that a canceled
+ // wait may execute after this AsyncScheduler destructs
+ if (ec != boost::asio::error::operation_aborted) {
+ process(get_time());
+ }
+ });
+}
+
+void AsyncScheduler::process(const Time& now)
+{
+ // must run in the executor. we should only invoke completion handlers if the
+ // executor is running
+ assert(get_executor().running_in_this_thread());
+
+ ClientSums rsums, psums;
+
+ while (outstanding_requests < max_requests) {
+ auto pull = queue.pull_request(now);
+
+ if (pull.is_none()) {
+ // no pending requests, cancel the timer
+ timer.cancel();
+ break;
+ }
+ if (pull.is_future()) {
+ // update the timer based on the future time
+ schedule(pull.getTime());
+ break;
+ }
+ ++outstanding_requests;
+ if(auto c = counters(client_id::count)){
+ c->inc(throttle_counters::l_outstanding);
+ }
+
+ // complete the request
+ auto& r = pull.get_retn();
+ auto client = r.client;
+ auto phase = r.phase;
+ auto started = r.request->started;
+ auto cost = r.request->cost;
+ auto c = static_cast<Completion*>(r.request.release());
+ Completion::post(std::unique_ptr<Completion>{c},
+ boost::system::error_code{}, phase);
+
+ if (auto c = counters(client)) {
+ auto lat = Clock::from_double(now) - Clock::from_double(started);
+ if (phase == PhaseType::reservation) {
+ inc(rsums, client, cost);
+ c->tinc(queue_counters::l_res_latency, lat);
+ } else {
+ inc(psums, client, cost);
+ c->tinc(queue_counters::l_prio_latency, lat);
+ }
+ }
+ }
+
+ if (outstanding_requests >= max_requests) {
+ if(auto c = counters(client_id::count)){
+ c->inc(throttle_counters::l_throttle);
+ }
+ }
+
+ for (size_t i = 0; i < client_count; i++) {
+ if (auto c = counters(static_cast<client_id>(i))) {
+ on_process(c, rsums[i], psums[i]);
+ }
+ }
+}
+
+} // namespace rgw::dmclock