summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_ratelimit.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_ratelimit.h')
-rw-r--r--src/rgw/rgw_ratelimit.h292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/rgw/rgw_ratelimit.h b/src/rgw/rgw_ratelimit.h
new file mode 100644
index 000000000..2639d4d42
--- /dev/null
+++ b/src/rgw/rgw_ratelimit.h
@@ -0,0 +1,292 @@
+#pragma once
+#include <chrono>
+#include <thread>
+#include <condition_variable>
+#include "rgw_common.h"
+
+
+class RateLimiterEntry {
+ /*
+ fixed_point_rgw_ratelimit is important to preserve the precision of the token calculation
+ for example: a user have a limit of single op per minute, the user will consume its single token and then will send another request, 1s after it.
+ in that case, without this method, the user will get 0 tokens although it should get 0.016 tokens.
+ using this method it will add 16 tokens to the user, and the user will have 16 tokens, each time rgw will do comparison rgw will divide by fixed_point_rgw_ratelimit, so the user will be blocked anyway until it has enough tokens.
+ */
+ static constexpr int64_t fixed_point_rgw_ratelimit = 1000;
+ // counters are tracked in multiples of fixed_point_rgw_ratelimit
+ struct counters {
+ int64_t ops = 0;
+ int64_t bytes = 0;
+ };
+ counters read;
+ counters write;
+ ceph::timespan ts;
+ bool first_run = true;
+ std::mutex ts_lock;
+ // Those functions are returning the integer value of the tokens
+ int64_t read_ops () const
+ {
+ return read.ops / fixed_point_rgw_ratelimit;
+ }
+ int64_t write_ops() const
+ {
+ return write.ops / fixed_point_rgw_ratelimit;
+ }
+ int64_t read_bytes() const
+ {
+ return read.bytes / fixed_point_rgw_ratelimit;
+ }
+ int64_t write_bytes() const
+ {
+ return write.bytes / fixed_point_rgw_ratelimit;
+ }
+ bool should_rate_limit_read(int64_t ops_limit, int64_t bw_limit) {
+ //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+ if(((read_ops() - 1 < 0) && (ops_limit > 0)) ||
+ (read_bytes() < 0 && bw_limit > 0))
+ {
+ return true;
+ }
+ // we don't want to reduce ops' tokens if we've rejected it.
+ read.ops -= fixed_point_rgw_ratelimit;
+ return false;
+ }
+ bool should_rate_limit_write(int64_t ops_limit, int64_t bw_limit)
+ {
+ //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
+ if(((write_ops() - 1 < 0) && (ops_limit > 0)) ||
+ (write_bytes() < 0 && bw_limit > 0))
+ {
+ return true;
+ }
+
+ // we don't want to reduce ops' tokens if we've rejected it.
+ write.ops -= fixed_point_rgw_ratelimit;
+ return false;
+ }
+ /* The purpose of this function is to minimum time before overriding the stored timestamp
+ This function is necessary to force the increase tokens add at least 1 token when it updates the last stored timestamp.
+ That way the user/bucket will not lose tokens because of rounding
+ */
+ bool minimum_time_reached(ceph::timespan curr_timestamp) const
+ {
+ using namespace std::chrono;
+ constexpr auto min_duration = duration_cast<ceph::timespan>(seconds(60)) / fixed_point_rgw_ratelimit;
+ const auto delta = curr_timestamp - ts;
+ if (delta < min_duration)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ void increase_tokens(ceph::timespan curr_timestamp,
+ const RGWRateLimitInfo* info)
+ {
+ constexpr int fixed_point = fixed_point_rgw_ratelimit;
+ if (first_run)
+ {
+ write.ops = info->max_write_ops * fixed_point;
+ write.bytes = info->max_write_bytes * fixed_point;
+ read.ops = info->max_read_ops * fixed_point;
+ read.bytes = info->max_read_bytes * fixed_point;
+ ts = curr_timestamp;
+ first_run = false;
+ return;
+ }
+ else if(curr_timestamp > ts && minimum_time_reached(curr_timestamp))
+ {
+ const int64_t time_in_ms = std::chrono::duration_cast<std::chrono::milliseconds>(curr_timestamp - ts).count() / 60.0 / std::milli::den * fixed_point; // / 60 to make it work with 1 min token bucket
+ ts = curr_timestamp;
+ const int64_t write_ops = info->max_write_ops * time_in_ms;
+ const int64_t write_bw = info->max_write_bytes * time_in_ms;
+ const int64_t read_ops = info->max_read_ops * time_in_ms;
+ const int64_t read_bw = info->max_read_bytes * time_in_ms;
+ read.ops = std::min(info->max_read_ops * fixed_point, read_ops + read.ops);
+ read.bytes = std::min(info->max_read_bytes * fixed_point, read_bw + read.bytes);
+ write.ops = std::min(info->max_write_ops * fixed_point, write_ops + write.ops);
+ write.bytes = std::min(info->max_write_bytes * fixed_point, write_bw + write.bytes);
+ }
+ }
+
+ public:
+ bool should_rate_limit(bool is_read, const RGWRateLimitInfo* ratelimit_info, ceph::timespan curr_timestamp)
+ {
+ std::unique_lock lock(ts_lock);
+ increase_tokens(curr_timestamp, ratelimit_info);
+ if (is_read)
+ {
+ return should_rate_limit_read(ratelimit_info->max_read_ops, ratelimit_info->max_read_bytes);
+ }
+ return should_rate_limit_write(ratelimit_info->max_write_ops, ratelimit_info->max_write_bytes);
+ }
+ void decrease_bytes(bool is_read, int64_t amount, const RGWRateLimitInfo* info) {
+ std::unique_lock lock(ts_lock);
+ // we don't want the tenant to be with higher debt than 120 seconds(2 min) of its limit
+ if (is_read)
+ {
+ read.bytes = std::max(read.bytes - amount * fixed_point_rgw_ratelimit,info->max_read_bytes * fixed_point_rgw_ratelimit * -2);
+ } else {
+ write.bytes = std::max(write.bytes - amount * fixed_point_rgw_ratelimit,info->max_write_bytes * fixed_point_rgw_ratelimit * -2);
+ }
+ }
+ void giveback_tokens(bool is_read)
+ {
+ std::unique_lock lock(ts_lock);
+ if (is_read)
+ {
+ read.ops += fixed_point_rgw_ratelimit;
+ } else {
+ write.ops += fixed_point_rgw_ratelimit;
+ }
+ }
+};
+
+class RateLimiter {
+
+ static constexpr size_t map_size = 2000000; // will create it with the closest upper prime number
+ std::shared_mutex insert_lock;
+ std::atomic_bool& replacing;
+ std::condition_variable& cv;
+ typedef std::unordered_map<std::string, RateLimiterEntry> hash_map;
+ hash_map ratelimit_entries{map_size};
+ static bool is_read_op(const std::string_view method) {
+ if (method == "GET" || method == "HEAD")
+ {
+ return true;
+ }
+ return false;
+ }
+
+ // find or create an entry, and return its iterator
+ auto& find_or_create(const std::string& key) {
+ std::shared_lock rlock(insert_lock);
+ if (ratelimit_entries.size() > 0.9 * map_size && replacing == false)
+ {
+ replacing = true;
+ cv.notify_all();
+ }
+ auto ret = ratelimit_entries.find(key);
+ rlock.unlock();
+ if (ret == ratelimit_entries.end())
+ {
+ std::unique_lock wlock(insert_lock);
+ ret = ratelimit_entries.emplace(std::piecewise_construct,
+ std::forward_as_tuple(key),
+ std::forward_as_tuple()).first;
+ }
+ return ret->second;
+ }
+
+
+
+ public:
+ RateLimiter(const RateLimiter&) = delete;
+ RateLimiter& operator =(const RateLimiter&) = delete;
+ RateLimiter(RateLimiter&&) = delete;
+ RateLimiter& operator =(RateLimiter&&) = delete;
+ RateLimiter() = delete;
+ RateLimiter(std::atomic_bool& replacing, std::condition_variable& cv)
+ : replacing(replacing), cv(cv)
+ {
+ // prevents rehash, so no iterators invalidation
+ ratelimit_entries.max_load_factor(1000);
+ };
+
+ bool should_rate_limit(const char *method, const std::string& key, ceph::coarse_real_time curr_timestamp, const RGWRateLimitInfo* ratelimit_info) {
+ if (key.empty() || key.length() == 1 || !ratelimit_info->enabled)
+ {
+ return false;
+ }
+ bool is_read = is_read_op(method);
+ auto& it = find_or_create(key);
+ auto curr_ts = curr_timestamp.time_since_epoch();
+ return it.should_rate_limit(is_read ,ratelimit_info, curr_ts);
+ }
+ void giveback_tokens(const char *method, const std::string& key)
+ {
+ bool is_read = is_read_op(method);
+ auto& it = find_or_create(key);
+ it.giveback_tokens(is_read);
+ }
+ void decrease_bytes(const char *method, const std::string& key, const int64_t amount, const RGWRateLimitInfo* info) {
+ if (key.empty() || key.length() == 1 || !info->enabled)
+ {
+ return;
+ }
+ bool is_read = is_read_op(method);
+ if ((is_read && !info->max_read_bytes) || (!is_read && !info->max_write_bytes))
+ {
+ return;
+ }
+ auto& it = find_or_create(key);
+ it.decrease_bytes(is_read, amount, info);
+ }
+ void clear() {
+ ratelimit_entries.clear();
+ }
+};
+// This class purpose is to hold 2 RateLimiter instances, one active and one passive.
+// once the active has reached the watermark for clearing it will call the replace_active() thread using cv
+// The replace_active will clear the previous RateLimiter after all requests to it has been done (use_count() > 1)
+// In the meanwhile new requests will come into the newer active
+class ActiveRateLimiter : public DoutPrefix {
+ std::atomic_uint8_t stopped = {false};
+ std::condition_variable cv;
+ std::mutex cv_m;
+ std::thread runner;
+ std::atomic_bool replacing = false;
+ std::atomic_uint8_t current_active = 0;
+ std::shared_ptr<RateLimiter> ratelimit[2];
+ void replace_active() {
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> lk(cv_m);
+ while (!stopped) {
+ cv.wait(lk);
+ current_active = current_active ^ 1;
+ ldpp_dout(this, 20) << "replacing active ratelimit data structure" << dendl;
+ while (!stopped && ratelimit[(current_active ^ 1)].use_count() > 1 ) {
+ if (cv.wait_for(lk, 1min) != std::cv_status::timeout && stopped)
+ {
+ return;
+ }
+ }
+ if (stopped)
+ {
+ return;
+ }
+ ldpp_dout(this, 20) << "clearing passive ratelimit data structure" << dendl;
+ ratelimit[(current_active ^ 1)]->clear();
+ replacing = false;
+ }
+ }
+ public:
+ ActiveRateLimiter(const ActiveRateLimiter&) = delete;
+ ActiveRateLimiter& operator =(const ActiveRateLimiter&) = delete;
+ ActiveRateLimiter(ActiveRateLimiter&&) = delete;
+ ActiveRateLimiter& operator =(ActiveRateLimiter&&) = delete;
+ ActiveRateLimiter() = delete;
+ ActiveRateLimiter(CephContext* cct) :
+ DoutPrefix(cct, ceph_subsys_rgw, "rate limiter: ")
+ {
+ ratelimit[0] = std::make_shared<RateLimiter>(replacing, cv);
+ ratelimit[1] = std::make_shared<RateLimiter>(replacing, cv);
+ }
+ ~ActiveRateLimiter() {
+ ldpp_dout(this, 20) << "stopping ratelimit_gc thread" << dendl;
+ cv_m.lock();
+ stopped = true;
+ cv_m.unlock();
+ cv.notify_all();
+ runner.join();
+ }
+ std::shared_ptr<RateLimiter> get_active() {
+ return ratelimit[current_active];
+ }
+ void start() {
+ ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl;
+ runner = std::thread(&ActiveRateLimiter::replace_active, this);
+ const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc");
+ ceph_assert(rc==0);
+ }
+};