summaryrefslogtreecommitdiffstats
path: root/src/common/Throttle.h
blob: e190b946c458a8c89619a3a5e84d21c8893895a3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#ifndef CEPH_THROTTLE_H
#define CEPH_THROTTLE_H

#include <atomic>
#include <chrono>
#include <iostream>
#include <list>
#include <map>

#include "common/ceph_mutex.h"
#include "include/Context.h"
#include "common/ThrottleInterface.h"
#include "common/Timer.h"
#include "common/convenience.h"
#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
#include "crimson/common/perf_counters_collection.h"
#else
#include "common/perf_counters_collection.h"
#endif

/**
 * @class Throttle
 * Throttles the maximum number of active requests.
 *
 * This class defines the maximum number of slots currently taken away. The
 * excessive requests for more of them are delayed, until some slots are put
 * back, so @p get_current() drops below the limit after fulfills the requests.
 */
class Throttle final : public ThrottleInterface {
  CephContext *cct;
  const std::string name;
  PerfCountersRef logger;
  std::atomic<int64_t> count = { 0 }, max = { 0 };
  std::mutex lock;
  std::list<std::condition_variable> conds;
  const bool use_perf;

public:
  Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
  ~Throttle() override;

private:
  void _reset_max(int64_t m);
  bool _should_wait(int64_t c) const {
    int64_t m = max;
    int64_t cur = count;
    return
      m &&
      ((c <= m && cur + c > m) || // normally stay under max
       (c >= m && cur > m));     // except for large c
  }

  bool _wait(int64_t c, std::unique_lock<std::mutex>& l);

public:
  /**
   * gets the number of currently taken slots
   * @returns the number of taken slots
   */
  int64_t get_current() const {
    return count;
  }

  /**
   * get the max number of slots
   * @returns the max number of slots
   */
  int64_t get_max() const { return max; }

  /**
   * return true if past midpoint
   */
  bool past_midpoint() const {
    return count >= max / 2;
  }

  /**
   * set the new max number, and wait until the number of taken slots drains
   * and drops below this limit.
   *
   * @param m the new max number
   * @returns true if this method is blocked, false it it returns immediately
   */
  bool wait(int64_t m = 0);

  /**
   * take the specified number of slots from the stock regardless the throttling
   * @param c number of slots to take
   * @returns the total number of taken slots
   */
  int64_t take(int64_t c = 1) override;

  /**
   * get the specified amount of slots from the stock, but will wait if the
   * total number taken by consumer would exceed the maximum number.
   * @param c number of slots to get
   * @param m new maximum number to set, ignored if it is 0
   * @returns true if this request is blocked due to the throttling, false 
   * otherwise
   */
  bool get(int64_t c = 1, int64_t m = 0);

  /**
   * the unblocked version of @p get()
   * @returns true if it successfully got the requested amount,
   * or false if it would block.
   */
  bool get_or_fail(int64_t c = 1);

  /**
   * put slots back to the stock
   * @param c number of slots to return
   * @returns number of requests being hold after this
   */
  int64_t put(int64_t c = 1) override;
   /**
   * reset the zero to the stock
   */
  void reset();

  void reset_max(int64_t m) {
    std::lock_guard l(lock);
    _reset_max(m);
  }
};

/**
 * BackoffThrottle
 *
 * Creates a throttle which gradually induces delays when get() is called
 * based on params low_threshold, high_threshold, expected_throughput,
 * high_multiple, and max_multiple.
 *
 * In [0, low_threshold), we want no delay.
 *
 * In [low_threshold, high_threshold), delays should be injected based
 * on a line from 0 at low_threshold to
 * high_multiple * (1/expected_throughput) at high_threshold.
 *
 * In [high_threshold, 1), we want delays injected based on a line from
 * (high_multiple * (1/expected_throughput)) at high_threshold to
 * (high_multiple * (1/expected_throughput)) +
 * (max_multiple * (1/expected_throughput)) at 1.
 *
 * Let the current throttle ratio (current/max) be r, low_threshold be l,
 * high_threshold be h, high_delay (high_multiple / expected_throughput) be e,
 * and max_delay (max_multiple / expected_throughput) be m.
 *
 * delay = 0, r \in [0, l)
 * delay = (r - l) * (e / (h - l)), r \in [l, h)
 * delay = e + (r - h)((m - e)/(1 - h))
 */
class BackoffThrottle {
  const std::string name;
  PerfCountersRef logger;

  std::mutex lock;
  using locker = std::unique_lock<std::mutex>;

  unsigned next_cond = 0;

  /// allocated once to avoid constantly allocating new ones
  std::vector<std::condition_variable> conds;

  const bool use_perf;

  /// pointers into conds
  std::list<std::condition_variable*> waiters;

  std::list<std::condition_variable*>::iterator _push_waiter() {
    unsigned next = next_cond++;
    if (next_cond == conds.size())
      next_cond = 0;
    return waiters.insert(waiters.end(), &(conds[next]));
  }

  void _kick_waiters() {
    if (!waiters.empty())
      waiters.front()->notify_all();
  }

  /// see above, values are in [0, 1].
  double low_threshold = 0;
  double high_threshold = 1;

  /// see above, values are in seconds
  double high_delay_per_count = 0;
  double max_delay_per_count = 0;

  /// Filled in in set_params
  double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
  double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise

  /// max
  uint64_t max = 0;
  uint64_t current = 0;

  ceph::timespan _get_delay(uint64_t c) const;

public:
  /**
   * set_params
   *
   * Sets params.  If the params are invalid, returns false
   * and populates errstream (if non-null) with a user comprehensible
   * explanation.
   */
  bool set_params(
    double _low_threshold,
    double _high_threshold,
    double expected_throughput,
    double high_multiple,
    double max_multiple,
    uint64_t throttle_max,
    std::ostream *errstream);

  ceph::timespan get(uint64_t c = 1);
  ceph::timespan wait() {
    return get(0);
  }
  uint64_t put(uint64_t c = 1);
  uint64_t take(uint64_t c = 1);
  uint64_t get_current();
  uint64_t get_max();

  BackoffThrottle(CephContext *cct, const std::string& n,
    unsigned expected_concurrency, ///< [in] determines size of conds
    bool _use_perf = true);
  ~BackoffThrottle();
};


/**
 * @class SimpleThrottle
 * This is a simple way to bound the number of concurrent operations.
 *
 * It tracks the first error encountered, and makes it available
 * when all requests are complete. wait_for_ret() should be called
 * before the instance is destroyed.
 *
 * Re-using the same instance isn't safe if you want to check each set
 * of operations for errors, since the return value is not reset.
 */
class SimpleThrottle {
public:
  SimpleThrottle(uint64_t max, bool ignore_enoent);
  ~SimpleThrottle();
  void start_op();
  void end_op(int r);
  bool pending_error() const;
  int wait_for_ret();
private:
  mutable std::mutex m_lock;
  std::condition_variable m_cond;
  uint64_t m_max;
  uint64_t m_current = 0;
  int m_ret = 0;
  bool m_ignore_enoent;
  uint32_t waiters = 0;
};


class OrderedThrottle;

class C_OrderedThrottle : public Context {
public:
  C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
    : m_ordered_throttle(ordered_throttle), m_tid(tid) {
  }

protected:
  void finish(int r) override;

private:
  OrderedThrottle *m_ordered_throttle;
  uint64_t m_tid;
};

/**
 * @class OrderedThrottle
 * Throttles the maximum number of active requests and completes them in order
 *
 * Operations can complete out-of-order but their associated Context callback
 * will completed in-order during invocation of start_op() and wait_for_ret()
 */
class OrderedThrottle {
public:
  OrderedThrottle(uint64_t max, bool ignore_enoent);
  ~OrderedThrottle();

  C_OrderedThrottle *start_op(Context *on_finish);
  void end_op(int r);

  bool pending_error() const;
  int wait_for_ret();

protected:
  friend class C_OrderedThrottle;

  void finish_op(uint64_t tid, int r);

private:
  struct Result {
    bool finished;
    int ret_val;
    Context *on_finish;

    Result(Context *_on_finish = NULL)
      : finished(false), ret_val(0), on_finish(_on_finish) {
    }
  };

  typedef std::map<uint64_t, Result> TidResult;

  mutable std::mutex m_lock;
  std::condition_variable m_cond;
  uint64_t m_max;
  uint64_t m_current = 0;
  int m_ret_val = 0;
  bool m_ignore_enoent;

  uint64_t m_next_tid = 0;
  uint64_t m_complete_tid = 0;

  TidResult m_tid_result;

  void complete_pending_ops(std::unique_lock<std::mutex>& l);
  uint32_t waiters = 0;
};


class TokenBucketThrottle {
  struct Bucket {
    CephContext *cct;
    const std::string name;

    uint64_t remain;
    uint64_t max;
    uint64_t capacity;
    uint64_t available;

    Bucket(CephContext *cct, const std::string &name, uint64_t m)
      : cct(cct), name(name), remain(m), max(m), capacity(m), available(m) {}

    uint64_t get(uint64_t c);
    uint64_t put(uint64_t tokens, double burst_ratio);
    void set_max(uint64_t max, uint64_t burst_seconds);
  };

  struct Blocker {
    uint64_t tokens_requested;
    Context *ctx;

    Blocker(uint64_t _tokens_requested, Context* _ctx)
      : tokens_requested(_tokens_requested), ctx(_ctx) {}
  };

  CephContext *m_cct;
  const std::string m_name;
  Bucket m_throttle;
  uint64_t m_burst = 0;
  uint64_t m_avg = 0;
  SafeTimer *m_timer;
  ceph::mutex *m_timer_lock;
  Context *m_token_ctx = nullptr;
  std::list<Blocker> m_blockers;
  ceph::mutex m_lock;

  // minimum of the filling period.
  uint64_t m_tick_min = 50;
  // tokens filling period, its unit is millisecond.
  uint64_t m_tick = 0;
  /**
   * These variables are used to calculate how many tokens need to be put into
   * the bucket within each tick.
   *
   * In actual use, the tokens to be put per tick(m_avg / m_ticks_per_second)
   * may be a floating point number, but we need an 'uint64_t' to put into the
   * bucket.
   *
   * For example, we set the value of rate to be 950, means 950 iops(or bps).
   *
   * In this case, the filling period(m_tick) should be 1000 / 950 = 1.052,
   * which is too small for the SafeTimer. So we should set the period(m_tick)
   * to be 50(m_tick_min), and 20 ticks in one second(m_ticks_per_second).
   * The tokens filled in bucket per tick is 950 / 20 = 47.5, not an integer.
   *
   * To resolve this, we use a method called tokens_filled(m_current_tick) to
   * calculate how many tokens will be put so far(until m_current_tick):
   *
   *   tokens_filled = m_current_tick / m_ticks_per_second * m_avg
   *
   * And the difference between two ticks will be the result we expect.
   *   tokens in tick 0: (1 / 20 * 950) - (0 / 20 * 950) =  47 -   0 = 47
   *   tokens in tick 1: (2 / 20 * 950) - (1 / 20 * 950) =  95 -  47 = 48
   *   tokens in tick 2: (3 / 20 * 950) - (2 / 20 * 950) = 142 -  95 = 47
   *
   * As a result, the tokens filled in one second will shown as this:
   *   tick    | 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|16|17|18|19|20|
   *   tokens  |47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|47|48|
   */
  uint64_t m_ticks_per_second = 0;
  uint64_t m_current_tick = 0;

  // period for the bucket filling tokens, its unit is seconds.
  double m_schedule_tick = 1.0;

public:
  TokenBucketThrottle(CephContext *cct, const std::string &name,
                      uint64_t burst, uint64_t avg,
                      SafeTimer *timer, ceph::mutex *timer_lock);

  ~TokenBucketThrottle();

  const std::string &get_name() {
    return m_name;
  }

  template <typename T, typename MF, typename I>
  void add_blocker(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
    auto ctx = new LambdaContext(
      [t, mf, item=std::forward<I>(item), flag](int) mutable {
        (t->*mf)(std::forward<I>(item), flag);
      });
    m_blockers.emplace_back(c, ctx);
  }

  template <typename T, typename MF, typename I>
  bool get(uint64_t c, T&& t, MF&& mf, I&& item, uint64_t flag) {
    bool wait = false;
    uint64_t got = 0;
    std::lock_guard lock(m_lock);
    if (!m_blockers.empty()) {
      // Keep the order of requests, add item after previous blocked requests.
      wait = true;
    } else {
      if (0 == m_throttle.max || 0 == m_avg)
        return false;

      got = m_throttle.get(c);
      if (got < c) {
        // Not enough tokens, add a blocker for it.
        wait = true;
      }
    }

    if (wait) {
      add_blocker(c - got, std::forward<T>(t), std::forward<MF>(mf),
                  std::forward<I>(item), flag);
    }

    return wait;
  }

  int set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds);
  void set_schedule_tick_min(uint64_t tick);

private:
  uint64_t tokens_filled(double tick);
  uint64_t tokens_this_tick();
  void add_tokens();
  void schedule_timer();
  void cancel_timer();
};

#endif