diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-27 18:24:20 +0000 |
commit | 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch) | |
tree | e5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/os/filestore/WBThrottle.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream.tar.xz ceph-upstream.zip |
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/os/filestore/WBThrottle.cc')
-rw-r--r-- | src/os/filestore/WBThrottle.cc | 272 |
1 files changed, 272 insertions, 0 deletions
diff --git a/src/os/filestore/WBThrottle.cc b/src/os/filestore/WBThrottle.cc new file mode 100644 index 00000000..ba2ed131 --- /dev/null +++ b/src/os/filestore/WBThrottle.cc @@ -0,0 +1,272 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "acconfig.h" + +#include "os/filestore/WBThrottle.h" +#include "common/perf_counters.h" +#include "common/errno.h" + +WBThrottle::WBThrottle(CephContext *cct) : + cur_ios(0), cur_size(0), + cct(cct), + logger(NULL), + stopping(true), + lock("WBThrottle::lock", false, true, false), + fs(XFS) +{ + { + Mutex::Locker l(lock); + set_from_conf(); + } + ceph_assert(cct); + PerfCountersBuilder b( + cct, string("WBThrottle"), + l_wbthrottle_first, l_wbthrottle_last); + b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data", NULL, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data", NULL, 0, unit_t(UNIT_BYTES)); + b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations"); + b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations"); + b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write"); + b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries"); + logger = b.create_perf_counters(); + cct->get_perfcounters_collection()->add(logger); + for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i) + logger->set(i, 0); + + cct->_conf.add_observer(this); +} + +WBThrottle::~WBThrottle() { + ceph_assert(cct); + cct->get_perfcounters_collection()->remove(logger); + delete logger; + cct->_conf.remove_observer(this); +} + +void WBThrottle::start() +{ + { + Mutex::Locker l(lock); + stopping = false; + } + create("wb_throttle"); +} + +void WBThrottle::stop() +{ + { + Mutex::Locker l(lock); + stopping = true; + cond.Signal(); + } + + join(); +} + +const char** WBThrottle::get_tracked_conf_keys() const +{ + static const char* KEYS[] = { + "filestore_wbthrottle_btrfs_bytes_start_flusher", + "filestore_wbthrottle_btrfs_bytes_hard_limit", + "filestore_wbthrottle_btrfs_ios_start_flusher", + "filestore_wbthrottle_btrfs_ios_hard_limit", + "filestore_wbthrottle_btrfs_inodes_start_flusher", + "filestore_wbthrottle_btrfs_inodes_hard_limit", + "filestore_wbthrottle_xfs_bytes_start_flusher", + "filestore_wbthrottle_xfs_bytes_hard_limit", + "filestore_wbthrottle_xfs_ios_start_flusher", + "filestore_wbthrottle_xfs_ios_hard_limit", + "filestore_wbthrottle_xfs_inodes_start_flusher", + "filestore_wbthrottle_xfs_inodes_hard_limit", + NULL + }; + return KEYS; +} + +void WBThrottle::set_from_conf() +{ + ceph_assert(lock.is_locked()); + if (fs == BTRFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; + } else if (fs == XFS) { + size_limits.first = + cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; + size_limits.second = + cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; + io_limits.first = + cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; + io_limits.second = + cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; + fd_limits.first = + cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; + fd_limits.second = + cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; + } else { + ceph_abort_msg("invalid value for fs"); + } + cond.Signal(); +} + +void WBThrottle::handle_conf_change(const ConfigProxy& conf, + const std::set<std::string> &changed) +{ + Mutex::Locker l(lock); + for (const char** i = get_tracked_conf_keys(); *i; ++i) { + if (changed.count(*i)) { + set_from_conf(); + return; + } + } +} + +bool WBThrottle::get_next_should_flush( + boost::tuple<ghobject_t, FDRef, PendingWB> *next) +{ + ceph_assert(lock.is_locked()); + ceph_assert(next); + while (!stopping && (!beyond_limit() || pending_wbs.empty())) + cond.Wait(lock); + if (stopping) + return false; + ceph_assert(!pending_wbs.empty()); + ghobject_t obj(pop_object()); + + ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(obj); + *next = boost::make_tuple(obj, i->second.second, i->second.first); + pending_wbs.erase(i); + return true; +} + + +void *WBThrottle::entry() +{ + Mutex::Locker l(lock); + boost::tuple<ghobject_t, FDRef, PendingWB> wb; + while (get_next_should_flush(&wb)) { + clearing = wb.get<0>(); + cur_ios -= wb.get<2>().ios; + logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios); + logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios); + cur_size -= wb.get<2>().size; + logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size); + logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size); + logger->dec(l_wbthrottle_inodes_dirtied); + logger->inc(l_wbthrottle_inodes_wb); + lock.Unlock(); +#if defined(HAVE_FDATASYNC) + int r = ::fdatasync(**wb.get<1>()); +#else + int r = ::fsync(**wb.get<1>()); +#endif + if (r < 0) { + lderr(cct) << "WBThrottle fsync failed: " << cpp_strerror(errno) << dendl; + ceph_abort(); + } +#ifdef HAVE_POSIX_FADVISE + if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) { + int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); + ceph_assert(fa_r == 0); + } +#endif + lock.Lock(); + clearing = ghobject_t(); + cond.Signal(); + wb = boost::tuple<ghobject_t, FDRef, PendingWB>(); + } + return 0; +} + +void WBThrottle::queue_wb( + FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len, + bool nocache) +{ + Mutex::Locker l(lock); + ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator wbiter = + pending_wbs.find(hoid); + if (wbiter == pending_wbs.end()) { + wbiter = pending_wbs.insert( + make_pair(hoid, + make_pair( + PendingWB(), + fd))).first; + logger->inc(l_wbthrottle_inodes_dirtied); + } else { + remove_object(hoid); + } + + cur_ios++; + logger->inc(l_wbthrottle_ios_dirtied); + cur_size += len; + logger->inc(l_wbthrottle_bytes_dirtied, len); + + wbiter->second.first.add(nocache, len, 1); + insert_object(hoid); + if (beyond_limit()) + cond.Signal(); +} + +void WBThrottle::clear() +{ + Mutex::Locker l(lock); + for (ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.begin(); + i != pending_wbs.end(); + ++i) { +#ifdef HAVE_POSIX_FADVISE + if (cct->_conf->filestore_fadvise && i->second.first.nocache) { + int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED); + ceph_assert(fa_r == 0); + } +#endif + + } + cur_ios = cur_size = 0; + logger->set(l_wbthrottle_ios_dirtied, 0); + logger->set(l_wbthrottle_bytes_dirtied, 0); + logger->set(l_wbthrottle_inodes_dirtied, 0); + pending_wbs.clear(); + lru.clear(); + rev_lru.clear(); + cond.Signal(); +} + +void WBThrottle::clear_object(const ghobject_t &hoid) +{ + Mutex::Locker l(lock); + while (clearing == hoid) + cond.Wait(lock); + ceph::unordered_map<ghobject_t, pair<PendingWB, FDRef> >::iterator i = + pending_wbs.find(hoid); + if (i == pending_wbs.end()) + return; + + cur_ios -= i->second.first.ios; + logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios); + cur_size -= i->second.first.size; + logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size); + logger->dec(l_wbthrottle_inodes_dirtied); + + pending_wbs.erase(i); + remove_object(hoid); + cond.Signal(); +} + +void WBThrottle::throttle() +{ + Mutex::Locker l(lock); + while (!stopping && need_flush()) + cond.Wait(lock); +} |