diff options
Diffstat (limited to 'src/rgw/rgw_dmclock_sync_scheduler.cc')
-rw-r--r-- | src/rgw/rgw_dmclock_sync_scheduler.cc | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/src/rgw/rgw_dmclock_sync_scheduler.cc b/src/rgw/rgw_dmclock_sync_scheduler.cc new file mode 100644 index 00000000..650a995d --- /dev/null +++ b/src/rgw/rgw_dmclock_sync_scheduler.cc @@ -0,0 +1,114 @@ +#include "rgw_dmclock_scheduler.h" +#include "rgw_dmclock_sync_scheduler.h" +#include "rgw_dmclock_scheduler_ctx.h" + +namespace rgw::dmclock { + +SyncScheduler::~SyncScheduler() +{ + cancel(); +} + +int SyncScheduler::add_request(const client_id& client, const ReqParams& params, + const Time& time, Cost cost) +{ + std::mutex req_mtx; + std::condition_variable req_cv; + ReqState rstate {ReqState::Wait}; + auto req = SyncRequest{client, time, cost, req_mtx, req_cv, rstate, counters}; + int r = queue.add_request_time(req, client, params, time, cost); + if (r == 0) { + if (auto c = counters(client)) { + c->inc(queue_counters::l_qlen); + c->inc(queue_counters::l_cost, cost); + } + queue.request_completed(); + // Perform a blocking wait until the request callback is called + { + std::unique_lock lock{req_mtx}; + req_cv.wait(lock, [&rstate] {return rstate != ReqState::Wait;}); + } + if (rstate == ReqState::Cancelled) { + //FIXME: decide on error code for cancelled request + r = -ECONNABORTED; + } + } else { + // post the error code + if (auto c = counters(client)) { + c->inc(queue_counters::l_limit); + c->inc(queue_counters::l_limit_cost, cost); + } + } + return r; +} + +void SyncScheduler::handle_request_cb(const client_id &c, + std::unique_ptr<SyncRequest> req, + PhaseType phase, Cost cost) +{ + { std::lock_guard<std::mutex> lg(req->req_mtx); + req->req_state = ReqState::Ready; + req->req_cv.notify_one(); + } + + if (auto ctr = req->counters(c)) { + auto lat = Clock::from_double(get_time()) - Clock::from_double(req->started); + if (phase == PhaseType::reservation){ + ctr->tinc(queue_counters::l_res_latency, lat); + ctr->inc(queue_counters::l_res); + if (cost) ctr->inc(queue_counters::l_res_cost); + } else if (phase == PhaseType::priority){ + ctr->tinc(queue_counters::l_prio_latency, lat); + ctr->inc(queue_counters::l_prio); + if (cost) ctr->inc(queue_counters::l_prio_cost); + } + ctr->dec(queue_counters::l_qlen); + if (cost) ctr->dec(queue_counters::l_cost); + } +} + + +void SyncScheduler::cancel(const client_id& client) +{ + ClientSum sum; + + queue.remove_by_client(client, false, [&](RequestRef&& request) + { + sum.count++; + sum.cost += request->cost; + { + std::lock_guard <std::mutex> lg(request->req_mtx); + request->req_state = ReqState::Cancelled; + request->req_cv.notify_one(); + } + }); + if (auto c = counters(client)) { + on_cancel(c, sum); + } + + queue.request_completed(); +} + +void SyncScheduler::cancel() +{ + ClientSums sums; + + queue.remove_by_req_filter([&](RequestRef&& request) -> bool + { + inc(sums, request->client, request->cost); + { + std::lock_guard<std::mutex> lg(request->req_mtx); + request->req_state = ReqState::Cancelled; + request->req_cv.notify_one(); + } + return true; + }); + + for (size_t i = 0; i < client_count; i++) { + if (auto c = counters(static_cast<client_id>(i))) { + on_cancel(c, sums[i]); + } + } +} + +} // namespace rgw::dmclock |