diff options
Diffstat (limited to 'src/common/Throttle.cc')
-rw-r--r-- | src/common/Throttle.cc | 887 |
1 files changed, 887 insertions, 0 deletions
diff --git a/src/common/Throttle.cc b/src/common/Throttle.cc new file mode 100644 index 000000000..7af940279 --- /dev/null +++ b/src/common/Throttle.cc @@ -0,0 +1,887 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "include/scope_guard.h" + +#include "common/Throttle.h" +#include "common/ceph_time.h" +#include "common/perf_counters.h" + + +// re-include our assert to clobber the system one; fix dout: +#include "include/ceph_assert.h" + +#define dout_subsys ceph_subsys_throttle + +#undef dout_prefix +#define dout_prefix *_dout << "throttle(" << name << " " << (void*)this << ") " + +using std::list; +using std::ostream; +using std::string; + +using ceph::mono_clock; +using ceph::mono_time; +using ceph::timespan; + +enum { + l_throttle_first = 532430, + l_throttle_val, + l_throttle_max, + l_throttle_get_started, + l_throttle_get, + l_throttle_get_sum, + l_throttle_get_or_fail_fail, + l_throttle_get_or_fail_success, + l_throttle_take, + l_throttle_take_sum, + l_throttle_put, + l_throttle_put_sum, + l_throttle_wait, + l_throttle_last, +}; + +Throttle::Throttle(CephContext *cct, const std::string& n, int64_t m, + bool _use_perf) + : cct(cct), name(n), max(m), + use_perf(_use_perf) +{ + ceph_assert(m >= 0); + + if (!use_perf) + return; + + if (cct->_conf->throttler_perf_counter) { + PerfCountersBuilder b(cct, string("throttle-") + name, l_throttle_first, l_throttle_last); + b.add_u64(l_throttle_val, "val", "Currently taken slots"); + b.add_u64(l_throttle_max, "max", "Max value for throttle"); + b.add_u64_counter(l_throttle_get_started, "get_started", "Number of get calls, increased before wait"); + b.add_u64_counter(l_throttle_get, "get", "Gets"); + b.add_u64_counter(l_throttle_get_sum, "get_sum", "Got data"); + b.add_u64_counter(l_throttle_get_or_fail_fail, "get_or_fail_fail", "Get blocked during get_or_fail"); + b.add_u64_counter(l_throttle_get_or_fail_success, "get_or_fail_success", "Successful get during get_or_fail"); + b.add_u64_counter(l_throttle_take, "take", "Takes"); + b.add_u64_counter(l_throttle_take_sum, "take_sum", "Taken data"); + b.add_u64_counter(l_throttle_put, "put", "Puts"); + b.add_u64_counter(l_throttle_put_sum, "put_sum", "Put data"); + b.add_time_avg(l_throttle_wait, "wait", "Waiting latency"); + + logger = { b.create_perf_counters(), cct }; + cct->get_perfcounters_collection()->add(logger.get()); + logger->set(l_throttle_max, max); + } +} + +Throttle::~Throttle() +{ + std::lock_guard l(lock); + ceph_assert(conds.empty()); +} + +void Throttle::_reset_max(int64_t m) +{ + // lock must be held. + if (max == m) + return; + if (!conds.empty()) + conds.front().notify_one(); + if (logger) + logger->set(l_throttle_max, m); + max = m; +} + +bool Throttle::_wait(int64_t c, std::unique_lock<std::mutex>& l) +{ + mono_time start; + bool waited = false; + if (_should_wait(c) || !conds.empty()) { // always wait behind other waiters. + { + auto cv = conds.emplace(conds.end()); + auto w = make_scope_guard([this, cv]() { + conds.erase(cv); + }); + waited = true; + ldout(cct, 2) << "_wait waiting..." << dendl; + if (logger) + start = mono_clock::now(); + + cv->wait(l, [this, c, cv]() { return (!_should_wait(c) && + cv == conds.begin()); }); + ldout(cct, 2) << "_wait finished waiting" << dendl; + if (logger) { + logger->tinc(l_throttle_wait, mono_clock::now() - start); + } + } + // wake up the next guy + if (!conds.empty()) + conds.front().notify_one(); + } + return waited; +} + +bool Throttle::wait(int64_t m) +{ + if (0 == max && 0 == m) { + return false; + } + + std::unique_lock l(lock); + if (m) { + ceph_assert(m > 0); + _reset_max(m); + } + ldout(cct, 10) << "wait" << dendl; + return _wait(0, l); +} + +int64_t Throttle::take(int64_t c) +{ + if (0 == max) { + return 0; + } + ceph_assert(c >= 0); + ldout(cct, 10) << "take " << c << dendl; + count += c; + if (logger) { + logger->inc(l_throttle_take); + logger->inc(l_throttle_take_sum, c); + logger->set(l_throttle_val, count); + } + return count; +} + +bool Throttle::get(int64_t c, int64_t m) +{ + if (0 == max && 0 == m) { + count += c; + return false; + } + + ceph_assert(c >= 0); + ldout(cct, 10) << "get " << c << " (" << count.load() << " -> " << (count.load() + c) << ")" << dendl; + if (logger) { + logger->inc(l_throttle_get_started); + } + bool waited = false; + { + std::unique_lock l(lock); + if (m) { + ceph_assert(m > 0); + _reset_max(m); + } + waited = _wait(c, l); + count += c; + } + if (logger) { + logger->inc(l_throttle_get); + logger->inc(l_throttle_get_sum, c); + logger->set(l_throttle_val, count); + } + return waited; +} + +/* Returns true if it successfully got the requested amount, + * or false if it would block. + */ +bool Throttle::get_or_fail(int64_t c) +{ + if (0 == max) { + count += c; + return true; + } + + assert (c >= 0); + bool result = false; + { + std::lock_guard l(lock); + if (_should_wait(c) || !conds.empty()) { + ldout(cct, 10) << "get_or_fail " << c << " failed" << dendl; + result = false; + } else { + ldout(cct, 10) << "get_or_fail " << c << " success (" << count.load() + << " -> " << (count.load() + c) << ")" << dendl; + count += c; + result = true; + } + } + + if (logger) { + if (result) { + logger->inc(l_throttle_get_or_fail_success); + logger->inc(l_throttle_get); + logger->inc(l_throttle_get_sum, c); + logger->set(l_throttle_val, count); + } else { + logger->inc(l_throttle_get_or_fail_fail); + } + } + return result; +} + +int64_t Throttle::put(int64_t c) +{ + if (0 == max) { + count -= c; + return 0; + } + + ceph_assert(c >= 0); + ldout(cct, 10) << "put " << c << " (" << count.load() << " -> " + << (count.load()-c) << ")" << dendl; + int64_t new_count; + { + std::lock_guard l(lock); + new_count = count; + if (c) { + if (!conds.empty()) + conds.front().notify_one(); + // if count goes negative, we failed somewhere! + ceph_assert(count >= c); + new_count = count -= c; + } + } + if (logger) { + logger->inc(l_throttle_put); + logger->inc(l_throttle_put_sum, c); + logger->set(l_throttle_val, count); + } + + return new_count; +} + +void Throttle::reset() +{ + std::lock_guard l(lock); + if (!conds.empty()) + conds.front().notify_one(); + count = 0; + if (logger) { + logger->set(l_throttle_val, 0); + } +} + +enum { + l_backoff_throttle_first = l_throttle_last + 1, + l_backoff_throttle_val, + l_backoff_throttle_max, + l_backoff_throttle_get, + l_backoff_throttle_get_sum, + l_backoff_throttle_take, + l_backoff_throttle_take_sum, + l_backoff_throttle_put, + l_backoff_throttle_put_sum, + l_backoff_throttle_wait, + l_backoff_throttle_last, +}; + +BackoffThrottle::BackoffThrottle(CephContext *cct, const std::string& n, + unsigned expected_concurrency, bool _use_perf) + : name(n), + conds(expected_concurrency),///< [in] determines size of conds + use_perf(_use_perf) +{ + if (!use_perf) + return; + + if (cct->_conf->throttler_perf_counter) { + PerfCountersBuilder b(cct, string("throttle-") + name, + l_backoff_throttle_first, l_backoff_throttle_last); + b.add_u64(l_backoff_throttle_val, "val", "Currently available throttle"); + b.add_u64(l_backoff_throttle_max, "max", "Max value for throttle"); + b.add_u64_counter(l_backoff_throttle_get, "get", "Gets"); + b.add_u64_counter(l_backoff_throttle_get_sum, "get_sum", "Got data"); + b.add_u64_counter(l_backoff_throttle_take, "take", "Takes"); + b.add_u64_counter(l_backoff_throttle_take_sum, "take_sum", "Taken data"); + b.add_u64_counter(l_backoff_throttle_put, "put", "Puts"); + b.add_u64_counter(l_backoff_throttle_put_sum, "put_sum", "Put data"); + b.add_time_avg(l_backoff_throttle_wait, "wait", "Waiting latency"); + + logger = { b.create_perf_counters(), cct }; + cct->get_perfcounters_collection()->add(logger.get()); + logger->set(l_backoff_throttle_max, max); + } +} + +BackoffThrottle::~BackoffThrottle() +{ + std::lock_guard l(lock); + ceph_assert(waiters.empty()); +} + +bool BackoffThrottle::set_params( + double _low_threshold, + double _high_threshold, + double _expected_throughput, + double _high_multiple, + double _max_multiple, + uint64_t _throttle_max, + ostream *errstream) +{ + bool valid = true; + if (_low_threshold > _high_threshold) { + valid = false; + if (errstream) { + *errstream << "low_threshold (" << _low_threshold + << ") > high_threshold (" << _high_threshold + << ")" << std::endl; + } + } + + if (_high_multiple > _max_multiple) { + valid = false; + if (errstream) { + *errstream << "_high_multiple (" << _high_multiple + << ") > _max_multiple (" << _max_multiple + << ")" << std::endl; + } + } + + if (_low_threshold > 1 || _low_threshold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid low_threshold (" << _low_threshold << ")" + << std::endl; + } + } + + if (_high_threshold > 1 || _high_threshold < 0) { + valid = false; + if (errstream) { + *errstream << "invalid high_threshold (" << _high_threshold << ")" + << std::endl; + } + } + + if (_max_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _max_multiple (" + << _max_multiple << ")" + << std::endl; + } + } + + if (_high_multiple < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _high_multiple (" + << _high_multiple << ")" + << std::endl; + } + } + + if (_expected_throughput < 0) { + valid = false; + if (errstream) { + *errstream << "invalid _expected_throughput(" + << _expected_throughput << ")" + << std::endl; + } + } + + if (!valid) + return false; + + locker l(lock); + low_threshold = _low_threshold; + high_threshold = _high_threshold; + high_delay_per_count = _high_multiple / _expected_throughput; + max_delay_per_count = _max_multiple / _expected_throughput; + max = _throttle_max; + + if (logger) + logger->set(l_backoff_throttle_max, max); + + if (high_threshold - low_threshold > 0) { + s0 = high_delay_per_count / (high_threshold - low_threshold); + } else { + low_threshold = high_threshold; + s0 = 0; + } + + if (1 - high_threshold > 0) { + s1 = (max_delay_per_count - high_delay_per_count) + / (1 - high_threshold); + } else { + high_threshold = 1; + s1 = 0; + } + + _kick_waiters(); + return true; +} + +ceph::timespan BackoffThrottle::_get_delay(uint64_t c) const +{ + if (max == 0) + return ceph::timespan(0); + + double r = ((double)current) / ((double)max); + if (r < low_threshold) { + return ceph::timespan(0); + } else if (r < high_threshold) { + return c * ceph::make_timespan( + (r - low_threshold) * s0); + } else { + return c * ceph::make_timespan( + high_delay_per_count + ((r - high_threshold) * s1)); + } +} + +ceph::timespan BackoffThrottle::get(uint64_t c) +{ + locker l(lock); + auto delay = _get_delay(c); + + if (logger) { + logger->inc(l_backoff_throttle_get); + logger->inc(l_backoff_throttle_get_sum, c); + } + + // fast path + if (delay.count() == 0 && + waiters.empty() && + ((max == 0) || (current == 0) || ((current + c) <= max))) { + current += c; + + if (logger) { + logger->set(l_backoff_throttle_val, current); + } + + return ceph::make_timespan(0); + } + + auto ticket = _push_waiter(); + auto wait_from = mono_clock::now(); + bool waited = false; + + while (waiters.begin() != ticket) { + (*ticket)->wait(l); + waited = true; + } + + auto start = mono_clock::now(); + delay = _get_delay(c); + while (true) { + if (max != 0 && current != 0 && (current + c) > max) { + (*ticket)->wait(l); + waited = true; + } else if (delay.count() > 0) { + (*ticket)->wait_for(l, delay); + waited = true; + } else { + break; + } + ceph_assert(ticket == waiters.begin()); + delay = _get_delay(c); + auto elapsed = mono_clock::now() - start; + if (delay <= elapsed) { + delay = timespan::zero(); + } else { + delay -= elapsed; + } + } + waiters.pop_front(); + _kick_waiters(); + + current += c; + + if (logger) { + logger->set(l_backoff_throttle_val, current); + if (waited) { + logger->tinc(l_backoff_throttle_wait, mono_clock::now() - wait_from); + } + } + + return mono_clock::now() - start; +} + +uint64_t BackoffThrottle::put(uint64_t c) +{ + locker l(lock); + ceph_assert(current >= c); + current -= c; + _kick_waiters(); + + if (logger) { + logger->inc(l_backoff_throttle_put); + logger->inc(l_backoff_throttle_put_sum, c); + logger->set(l_backoff_throttle_val, current); + } + + return current; +} + +uint64_t BackoffThrottle::take(uint64_t c) +{ + locker l(lock); + current += c; + + if (logger) { + logger->inc(l_backoff_throttle_take); + logger->inc(l_backoff_throttle_take_sum, c); + logger->set(l_backoff_throttle_val, current); + } + + return current; +} + +uint64_t BackoffThrottle::get_current() +{ + locker l(lock); + return current; +} + +uint64_t BackoffThrottle::get_max() +{ + locker l(lock); + return max; +} + +SimpleThrottle::SimpleThrottle(uint64_t max, bool ignore_enoent) + : m_max(max), m_ignore_enoent(ignore_enoent) {} + +SimpleThrottle::~SimpleThrottle() +{ + std::lock_guard l(m_lock); + ceph_assert(m_current == 0); + ceph_assert(waiters == 0); +} + +void SimpleThrottle::start_op() +{ + std::unique_lock l(m_lock); + waiters++; + m_cond.wait(l, [this]() { return m_max != m_current; }); + waiters--; + ++m_current; +} + +void SimpleThrottle::end_op(int r) +{ + std::lock_guard l(m_lock); + --m_current; + if (r < 0 && !m_ret && !(r == -ENOENT && m_ignore_enoent)) + m_ret = r; + m_cond.notify_all(); +} + +bool SimpleThrottle::pending_error() const +{ + std::lock_guard l(m_lock); + return (m_ret < 0); +} + +int SimpleThrottle::wait_for_ret() +{ + std::unique_lock l(m_lock); + waiters++; + m_cond.wait(l, [this]() { return m_current == 0; }); + waiters--; + return m_ret; +} + +void C_OrderedThrottle::finish(int r) { + m_ordered_throttle->finish_op(m_tid, r); +} + +OrderedThrottle::OrderedThrottle(uint64_t max, bool ignore_enoent) + : m_max(max), m_ignore_enoent(ignore_enoent) {} + +OrderedThrottle::~OrderedThrottle() { + std::lock_guard l(m_lock); + ceph_assert(waiters == 0); +} + +C_OrderedThrottle *OrderedThrottle::start_op(Context *on_finish) { + ceph_assert(on_finish); + + std::unique_lock l(m_lock); + uint64_t tid = m_next_tid++; + m_tid_result[tid] = Result(on_finish); + auto ctx = std::make_unique<C_OrderedThrottle>(this, tid); + + complete_pending_ops(l); + while (m_max == m_current) { + ++waiters; + m_cond.wait(l); + --waiters; + complete_pending_ops(l); + } + ++m_current; + + return ctx.release(); +} + +void OrderedThrottle::end_op(int r) { + std::lock_guard l(m_lock); + ceph_assert(m_current > 0); + + if (r < 0 && m_ret_val == 0 && (r != -ENOENT || !m_ignore_enoent)) { + m_ret_val = r; + } + --m_current; + m_cond.notify_all(); +} + +void OrderedThrottle::finish_op(uint64_t tid, int r) { + std::lock_guard l(m_lock); + + auto it = m_tid_result.find(tid); + ceph_assert(it != m_tid_result.end()); + + it->second.finished = true; + it->second.ret_val = r; + m_cond.notify_all(); +} + +bool OrderedThrottle::pending_error() const { + std::lock_guard l(m_lock); + return (m_ret_val < 0); +} + +int OrderedThrottle::wait_for_ret() { + std::unique_lock l(m_lock); + complete_pending_ops(l); + + while (m_current > 0) { + ++waiters; + m_cond.wait(l); + --waiters; + complete_pending_ops(l); + } + return m_ret_val; +} + +void OrderedThrottle::complete_pending_ops(std::unique_lock<std::mutex>& l) { + while (true) { + auto it = m_tid_result.begin(); + if (it == m_tid_result.end() || it->first != m_complete_tid || + !it->second.finished) { + break; + } + + Result result = it->second; + m_tid_result.erase(it); + + l.unlock(); + result.on_finish->complete(result.ret_val); + l.lock(); + + ++m_complete_tid; + } +} + +#undef dout_prefix +#define dout_prefix *_dout << "TokenBucketThrottle(" << m_name << " " \ + << (void*)this << ") " + +uint64_t TokenBucketThrottle::Bucket::get(uint64_t c) { + if (0 == max) { + return 0; + } + + uint64_t got = 0; + if (available >= c) { + // There is enough token in bucket, take c. + got = c; + available -= c; + remain -= c; + } else { + // There is not enough, take all available. + got = available; + remain -= available; + available = 0; + } + return got; +} + +uint64_t TokenBucketThrottle::Bucket::put(uint64_t tokens, double burst_ratio) { + if (0 == max) { + return 0; + } + + if (tokens) { + // put tokens into bucket + uint64_t current = remain; + if ((current + tokens) <= capacity) { + remain += tokens; + } else { + remain = capacity; + } + + // available tokens increase at burst speed + uint64_t available_inc = tokens; + if (burst_ratio > 1) { + available_inc = (uint64_t)(tokens * burst_ratio); + } + uint64_t inc_upper_limit = remain > max ? max : remain; + if ((available + available_inc) <= inc_upper_limit ){ + available += available_inc; + }else{ + available = inc_upper_limit; + } + + } + return remain; +} + +void TokenBucketThrottle::Bucket::set_max(uint64_t max, uint64_t burst_seconds) { + // the capacity of bucket should not be less than max + if (burst_seconds < 1){ + burst_seconds = 1; + } + uint64_t new_capacity = max*burst_seconds; + if (capacity != new_capacity){ + capacity = new_capacity; + remain = capacity; + } + if (available > max || 0 == max) { + available = max; + } + this->max = max; +} + +TokenBucketThrottle::TokenBucketThrottle( + CephContext *cct, + const std::string &name, + uint64_t burst, + uint64_t avg, + SafeTimer *timer, + ceph::mutex *timer_lock) + : m_cct(cct), m_name(name), + m_throttle(m_cct, name + "_bucket", burst), + m_burst(burst), m_avg(avg), m_timer(timer), m_timer_lock(timer_lock), + m_lock(ceph::make_mutex(name + "_lock")) +{} + +TokenBucketThrottle::~TokenBucketThrottle() { + // cancel the timer events. + { + std::lock_guard timer_locker(*m_timer_lock); + cancel_timer(); + } + + list<Blocker> tmp_blockers; + { + std::lock_guard blockers_lock(m_lock); + tmp_blockers.splice(tmp_blockers.begin(), m_blockers, m_blockers.begin(), m_blockers.end()); + } + + for (auto b : tmp_blockers) { + b.ctx->complete(0); + } +} + +int TokenBucketThrottle::set_limit(uint64_t average, uint64_t burst, uint64_t burst_seconds) { + { + std::lock_guard lock{m_lock}; + + if (0 < burst && burst < average) { + // the burst should never less than the average. + return -EINVAL; + } + + m_avg = average; + m_burst = burst; + + if (0 == average) { + // The limit is not set, and no tokens will be put into the bucket. + // So, we can schedule the timer slowly, or even cancel it. + m_tick = 1000; + } else { + // calculate the tick(ms), don't less than the minimum. + m_tick = 1000 / average; + if (m_tick < m_tick_min) { + m_tick = m_tick_min; + } + + // this is for the number(avg) can not be divisible. + m_ticks_per_second = 1000 / m_tick; + m_current_tick = 0; + + // for the default configuration of burst. + m_throttle.set_max(0 == burst ? average : burst, burst_seconds); + } + // turn millisecond to second + m_schedule_tick = m_tick / 1000.0; + } + + // The schedule period will be changed when the average rate is set. + { + std::lock_guard timer_locker{*m_timer_lock}; + cancel_timer(); + schedule_timer(); + } + return 0; +} + +void TokenBucketThrottle::set_schedule_tick_min(uint64_t tick) { + std::lock_guard lock(m_lock); + if (tick != 0) { + m_tick_min = tick; + } +} + +uint64_t TokenBucketThrottle::tokens_filled(double tick) { + return (0 == m_avg) ? 0 : (tick / m_ticks_per_second * m_avg); +} + +uint64_t TokenBucketThrottle::tokens_this_tick() { + if (0 == m_avg) { + return 0; + } + if (m_current_tick >= m_ticks_per_second) { + m_current_tick = 0; + } + m_current_tick++; + + return tokens_filled(m_current_tick) - tokens_filled(m_current_tick - 1); +} + +void TokenBucketThrottle::add_tokens() { + list<Blocker> tmp_blockers; + { + std::lock_guard lock(m_lock); + // put tokens into bucket. + double burst_ratio = 1.0; + if (m_throttle.max > m_avg && m_avg > 0){ + burst_ratio = (double)m_throttle.max/m_avg; + } + m_throttle.put(tokens_this_tick(), burst_ratio); + if (0 == m_avg || 0 == m_throttle.max) + tmp_blockers.swap(m_blockers); + // check the m_blockers from head to tail, if blocker can get + // enough tokens, let it go. + while (!m_blockers.empty()) { + Blocker &blocker = m_blockers.front(); + uint64_t got = m_throttle.get(blocker.tokens_requested); + if (got == blocker.tokens_requested) { + // got enough tokens for front. + tmp_blockers.splice(tmp_blockers.end(), m_blockers, m_blockers.begin()); + } else { + // there is no more tokens. + blocker.tokens_requested -= got; + break; + } + } + } + + for (auto b : tmp_blockers) { + b.ctx->complete(0); + } +} + +void TokenBucketThrottle::schedule_timer() { + m_token_ctx = new LambdaContext( + [this](int r) { + schedule_timer(); + }); + m_timer->add_event_after(m_schedule_tick, m_token_ctx); + + add_tokens(); +} + +void TokenBucketThrottle::cancel_timer() { + m_timer->cancel_event(m_token_ctx); +} |