summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_ratelimit.h
blob: 2639d4d42749f0201426cc5ba71075eabcab777e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#pragma once
#include <chrono>
#include <thread>
#include <condition_variable>
#include "rgw_common.h"


class RateLimiterEntry {
  /* 
    fixed_point_rgw_ratelimit is important to preserve the precision of the token calculation
    for example: a user have a limit of single op per minute, the user will consume its single token and then will send another request, 1s after it.
    in that case, without this method, the user will get 0 tokens although it should get 0.016 tokens.
    using this method it will add 16 tokens to the user, and the user will have 16 tokens, each time rgw will do comparison rgw will divide by fixed_point_rgw_ratelimit, so the user will be blocked anyway until it has enough tokens.
  */
  static constexpr int64_t fixed_point_rgw_ratelimit = 1000;
  // counters are tracked in multiples of fixed_point_rgw_ratelimit
  struct counters {
    int64_t ops = 0;
    int64_t bytes = 0;
  };
  counters read;
  counters write;
  ceph::timespan ts;
  bool first_run = true;
  std::mutex ts_lock;
  // Those functions are returning the integer value of the tokens 
  int64_t read_ops () const
  {
    return read.ops / fixed_point_rgw_ratelimit;
  }
  int64_t write_ops() const
  {
    return write.ops / fixed_point_rgw_ratelimit;
  }
  int64_t read_bytes() const
  {
    return read.bytes / fixed_point_rgw_ratelimit;
  }
  int64_t write_bytes() const
  {
    return write.bytes / fixed_point_rgw_ratelimit;
  }
  bool should_rate_limit_read(int64_t ops_limit, int64_t bw_limit) {
    //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
    if(((read_ops() - 1 < 0) && (ops_limit > 0)) ||
      (read_bytes() < 0 && bw_limit > 0))
  {
    return true;
  }
    // we don't want to reduce ops' tokens if we've rejected it.
    read.ops -= fixed_point_rgw_ratelimit;
    return false;
  }
  bool should_rate_limit_write(int64_t ops_limit, int64_t bw_limit) 
  {
    //check if tenants did not reach their bw or ops limits and that the limits are not 0 (which is unlimited)
    if(((write_ops() - 1 < 0) && (ops_limit > 0)) ||
      (write_bytes() < 0 && bw_limit > 0))
    {
      return true;
    }

    // we don't want to reduce ops' tokens if we've rejected it.
    write.ops -= fixed_point_rgw_ratelimit;
    return false;
  }
  /* The purpose of this function is to minimum time before overriding the stored timestamp
     This function is necessary to force the increase tokens add at least 1 token when it updates the last stored timestamp.
     That way the user/bucket will not lose tokens because of rounding
  */
  bool minimum_time_reached(ceph::timespan curr_timestamp) const
  {
    using namespace std::chrono;
    constexpr auto min_duration = duration_cast<ceph::timespan>(seconds(60)) / fixed_point_rgw_ratelimit;
    const auto delta = curr_timestamp - ts;
    if (delta < min_duration)
    {
      return false;
    }
    return true;
  }

  void increase_tokens(ceph::timespan curr_timestamp,
                       const RGWRateLimitInfo* info)
  {
    constexpr int fixed_point = fixed_point_rgw_ratelimit;
    if (first_run)
    {
      write.ops = info->max_write_ops * fixed_point;
      write.bytes = info->max_write_bytes * fixed_point;
      read.ops = info->max_read_ops * fixed_point;
      read.bytes = info->max_read_bytes * fixed_point;
      ts = curr_timestamp;
      first_run = false;
      return;
    }
    else if(curr_timestamp > ts && minimum_time_reached(curr_timestamp))
    {
      const int64_t time_in_ms = std::chrono::duration_cast<std::chrono::milliseconds>(curr_timestamp - ts).count() / 60.0 / std::milli::den * fixed_point; // / 60 to make it work with 1 min token bucket
      ts = curr_timestamp;
      const int64_t write_ops = info->max_write_ops * time_in_ms;
      const int64_t write_bw = info->max_write_bytes * time_in_ms;
      const int64_t read_ops = info->max_read_ops * time_in_ms;
      const int64_t read_bw = info->max_read_bytes * time_in_ms;
      read.ops = std::min(info->max_read_ops * fixed_point, read_ops + read.ops);
      read.bytes = std::min(info->max_read_bytes * fixed_point, read_bw + read.bytes);
      write.ops = std::min(info->max_write_ops * fixed_point, write_ops + write.ops);
      write.bytes = std::min(info->max_write_bytes * fixed_point, write_bw + write.bytes);
    }
  }

  public:
    bool should_rate_limit(bool is_read, const RGWRateLimitInfo* ratelimit_info, ceph::timespan curr_timestamp)
    {
      std::unique_lock lock(ts_lock);
      increase_tokens(curr_timestamp, ratelimit_info);
      if (is_read)
      {
        return should_rate_limit_read(ratelimit_info->max_read_ops, ratelimit_info->max_read_bytes);
      }
      return should_rate_limit_write(ratelimit_info->max_write_ops, ratelimit_info->max_write_bytes);
    }
    void decrease_bytes(bool is_read, int64_t amount, const RGWRateLimitInfo* info) {
      std::unique_lock lock(ts_lock);
      // we don't want the tenant to be with higher debt than 120 seconds(2 min) of its limit
      if (is_read)
      {
        read.bytes = std::max(read.bytes - amount * fixed_point_rgw_ratelimit,info->max_read_bytes * fixed_point_rgw_ratelimit * -2);
      } else {
        write.bytes = std::max(write.bytes - amount * fixed_point_rgw_ratelimit,info->max_write_bytes * fixed_point_rgw_ratelimit * -2);
      }
    }
    void giveback_tokens(bool is_read)
    {
      std::unique_lock lock(ts_lock);
      if (is_read) 
      {
        read.ops += fixed_point_rgw_ratelimit;
      } else {
        write.ops += fixed_point_rgw_ratelimit;
      }
    }
};

class RateLimiter {

  static constexpr size_t map_size = 2000000; // will create it with the closest upper prime number
  std::shared_mutex insert_lock;
  std::atomic_bool& replacing;
  std::condition_variable& cv;
  typedef std::unordered_map<std::string, RateLimiterEntry> hash_map;
  hash_map ratelimit_entries{map_size};
  static bool is_read_op(const std::string_view method) {
    if (method == "GET" || method == "HEAD")
    {
      return true;
    }
    return false;
  }

    // find or create an entry, and return its iterator
  auto& find_or_create(const std::string& key) {
    std::shared_lock rlock(insert_lock);
    if (ratelimit_entries.size() > 0.9 * map_size && replacing == false)
    {
      replacing = true;
      cv.notify_all();
    }
    auto ret = ratelimit_entries.find(key);
    rlock.unlock();
    if (ret == ratelimit_entries.end())
    {
      std::unique_lock wlock(insert_lock);
      ret = ratelimit_entries.emplace(std::piecewise_construct,
                                 std::forward_as_tuple(key),
                                 std::forward_as_tuple()).first;
    }
    return ret->second;
  }

  

  public:
    RateLimiter(const RateLimiter&) = delete;
    RateLimiter& operator =(const RateLimiter&) = delete;
    RateLimiter(RateLimiter&&) = delete;
    RateLimiter& operator =(RateLimiter&&) = delete;
    RateLimiter() = delete;
    RateLimiter(std::atomic_bool& replacing, std::condition_variable& cv)
      : replacing(replacing), cv(cv)
    {
      // prevents rehash, so no iterators invalidation
      ratelimit_entries.max_load_factor(1000);
    };

    bool should_rate_limit(const char *method, const std::string& key, ceph::coarse_real_time curr_timestamp, const RGWRateLimitInfo* ratelimit_info) {
      if (key.empty() || key.length() == 1 || !ratelimit_info->enabled)
      {
        return false;
      }
      bool is_read = is_read_op(method);
      auto& it = find_or_create(key);
      auto curr_ts = curr_timestamp.time_since_epoch();
      return it.should_rate_limit(is_read ,ratelimit_info, curr_ts);
    }
    void giveback_tokens(const char *method, const std::string& key)
    {
      bool is_read = is_read_op(method);
      auto& it = find_or_create(key);
      it.giveback_tokens(is_read);
    }
    void decrease_bytes(const char *method, const std::string& key, const int64_t amount, const RGWRateLimitInfo* info) {
      if (key.empty() || key.length() == 1 || !info->enabled)
      {
        return;
      }
      bool is_read = is_read_op(method);
      if ((is_read && !info->max_read_bytes) || (!is_read && !info->max_write_bytes))
      {
        return;
      }
      auto& it = find_or_create(key);
      it.decrease_bytes(is_read, amount, info);
    }
    void clear() {
      ratelimit_entries.clear();
    }
};
// This class purpose is to hold 2 RateLimiter instances, one active and one passive.
// once the active has reached the watermark for clearing it will call the replace_active() thread using cv
// The replace_active will clear the previous RateLimiter after all requests to it has been done (use_count() > 1)
// In the meanwhile new requests will come into the newer active
class ActiveRateLimiter : public DoutPrefix  {
  std::atomic_uint8_t stopped = {false};
  std::condition_variable cv;
  std::mutex cv_m;
  std::thread runner;
  std::atomic_bool replacing = false;
  std::atomic_uint8_t current_active = 0;
  std::shared_ptr<RateLimiter> ratelimit[2];
  void replace_active() {
    using namespace std::chrono_literals;
    std::unique_lock<std::mutex> lk(cv_m);
    while (!stopped) {
      cv.wait(lk);
      current_active = current_active ^ 1;
      ldpp_dout(this, 20) << "replacing active ratelimit data structure" << dendl;
      while (!stopped && ratelimit[(current_active ^ 1)].use_count() > 1 ) {
        if (cv.wait_for(lk, 1min) != std::cv_status::timeout && stopped)
        {
          return;
        }
      }
      if (stopped)
      {
        return;
      }
      ldpp_dout(this, 20) << "clearing passive ratelimit data structure" << dendl;
      ratelimit[(current_active ^ 1)]->clear();
      replacing = false;
    }
  }
  public:
    ActiveRateLimiter(const ActiveRateLimiter&) = delete;
    ActiveRateLimiter& operator =(const ActiveRateLimiter&) = delete;
    ActiveRateLimiter(ActiveRateLimiter&&) = delete;
    ActiveRateLimiter& operator =(ActiveRateLimiter&&) = delete;
    ActiveRateLimiter() = delete;
    ActiveRateLimiter(CephContext* cct) :
      DoutPrefix(cct, ceph_subsys_rgw, "rate limiter: ")
    {
      ratelimit[0] = std::make_shared<RateLimiter>(replacing, cv);
      ratelimit[1] = std::make_shared<RateLimiter>(replacing, cv);
    }
    ~ActiveRateLimiter() {
      ldpp_dout(this, 20) << "stopping ratelimit_gc thread" << dendl;
      cv_m.lock();
      stopped = true;
      cv_m.unlock();
      cv.notify_all();
      runner.join();
    }
    std::shared_ptr<RateLimiter> get_active() {
      return ratelimit[current_active];
    }
    void start() {
      ldpp_dout(this, 20) << "starting ratelimit_gc thread" << dendl;
      runner = std::thread(&ActiveRateLimiter::replace_active, this);
      const auto rc = ceph_pthread_setname(runner.native_handle(), "ratelimit_gc");
      ceph_assert(rc==0);
    }
};