// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab #include "acconfig.h" #include "os/filestore/WBThrottle.h" #include "common/perf_counters.h" #include "common/errno.h" WBThrottle::WBThrottle(CephContext *cct) : cur_ios(0), cur_size(0), cct(cct), logger(NULL), stopping(true), lock("WBThrottle::lock", false, true, false), fs(XFS) { { Mutex::Locker l(lock); set_from_conf(); } ceph_assert(cct); PerfCountersBuilder b( cct, string("WBThrottle"), l_wbthrottle_first, l_wbthrottle_last); b.add_u64(l_wbthrottle_bytes_dirtied, "bytes_dirtied", "Dirty data", NULL, 0, unit_t(UNIT_BYTES)); b.add_u64(l_wbthrottle_bytes_wb, "bytes_wb", "Written data", NULL, 0, unit_t(UNIT_BYTES)); b.add_u64(l_wbthrottle_ios_dirtied, "ios_dirtied", "Dirty operations"); b.add_u64(l_wbthrottle_ios_wb, "ios_wb", "Written operations"); b.add_u64(l_wbthrottle_inodes_dirtied, "inodes_dirtied", "Entries waiting for write"); b.add_u64(l_wbthrottle_inodes_wb, "inodes_wb", "Written entries"); logger = b.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); for (unsigned i = l_wbthrottle_first + 1; i != l_wbthrottle_last; ++i) logger->set(i, 0); cct->_conf.add_observer(this); } WBThrottle::~WBThrottle() { ceph_assert(cct); cct->get_perfcounters_collection()->remove(logger); delete logger; cct->_conf.remove_observer(this); } void WBThrottle::start() { { Mutex::Locker l(lock); stopping = false; } create("wb_throttle"); } void WBThrottle::stop() { { Mutex::Locker l(lock); stopping = true; cond.Signal(); } join(); } const char** WBThrottle::get_tracked_conf_keys() const { static const char* KEYS[] = { "filestore_wbthrottle_btrfs_bytes_start_flusher", "filestore_wbthrottle_btrfs_bytes_hard_limit", "filestore_wbthrottle_btrfs_ios_start_flusher", "filestore_wbthrottle_btrfs_ios_hard_limit", "filestore_wbthrottle_btrfs_inodes_start_flusher", "filestore_wbthrottle_btrfs_inodes_hard_limit", "filestore_wbthrottle_xfs_bytes_start_flusher", "filestore_wbthrottle_xfs_bytes_hard_limit", "filestore_wbthrottle_xfs_ios_start_flusher", "filestore_wbthrottle_xfs_ios_hard_limit", "filestore_wbthrottle_xfs_inodes_start_flusher", "filestore_wbthrottle_xfs_inodes_hard_limit", NULL }; return KEYS; } void WBThrottle::set_from_conf() { ceph_assert(lock.is_locked()); if (fs == BTRFS) { size_limits.first = cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher; size_limits.second = cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit; io_limits.first = cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher; io_limits.second = cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit; fd_limits.first = cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher; fd_limits.second = cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit; } else if (fs == XFS) { size_limits.first = cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher; size_limits.second = cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit; io_limits.first = cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher; io_limits.second = cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit; fd_limits.first = cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher; fd_limits.second = cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit; } else { ceph_abort_msg("invalid value for fs"); } cond.Signal(); } void WBThrottle::handle_conf_change(const ConfigProxy& conf, const std::set &changed) { Mutex::Locker l(lock); for (const char** i = get_tracked_conf_keys(); *i; ++i) { if (changed.count(*i)) { set_from_conf(); return; } } } bool WBThrottle::get_next_should_flush( boost::tuple *next) { ceph_assert(lock.is_locked()); ceph_assert(next); while (!stopping && (!beyond_limit() || pending_wbs.empty())) cond.Wait(lock); if (stopping) return false; ceph_assert(!pending_wbs.empty()); ghobject_t obj(pop_object()); ceph::unordered_map >::iterator i = pending_wbs.find(obj); *next = boost::make_tuple(obj, i->second.second, i->second.first); pending_wbs.erase(i); return true; } void *WBThrottle::entry() { Mutex::Locker l(lock); boost::tuple wb; while (get_next_should_flush(&wb)) { clearing = wb.get<0>(); cur_ios -= wb.get<2>().ios; logger->dec(l_wbthrottle_ios_dirtied, wb.get<2>().ios); logger->inc(l_wbthrottle_ios_wb, wb.get<2>().ios); cur_size -= wb.get<2>().size; logger->dec(l_wbthrottle_bytes_dirtied, wb.get<2>().size); logger->inc(l_wbthrottle_bytes_wb, wb.get<2>().size); logger->dec(l_wbthrottle_inodes_dirtied); logger->inc(l_wbthrottle_inodes_wb); lock.Unlock(); #if defined(HAVE_FDATASYNC) int r = ::fdatasync(**wb.get<1>()); #else int r = ::fsync(**wb.get<1>()); #endif if (r < 0) { lderr(cct) << "WBThrottle fsync failed: " << cpp_strerror(errno) << dendl; ceph_abort(); } #ifdef HAVE_POSIX_FADVISE if (cct->_conf->filestore_fadvise && wb.get<2>().nocache) { int fa_r = posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED); ceph_assert(fa_r == 0); } #endif lock.Lock(); clearing = ghobject_t(); cond.Signal(); wb = boost::tuple(); } return 0; } void WBThrottle::queue_wb( FDRef fd, const ghobject_t &hoid, uint64_t offset, uint64_t len, bool nocache) { Mutex::Locker l(lock); ceph::unordered_map >::iterator wbiter = pending_wbs.find(hoid); if (wbiter == pending_wbs.end()) { wbiter = pending_wbs.insert( make_pair(hoid, make_pair( PendingWB(), fd))).first; logger->inc(l_wbthrottle_inodes_dirtied); } else { remove_object(hoid); } cur_ios++; logger->inc(l_wbthrottle_ios_dirtied); cur_size += len; logger->inc(l_wbthrottle_bytes_dirtied, len); wbiter->second.first.add(nocache, len, 1); insert_object(hoid); if (beyond_limit()) cond.Signal(); } void WBThrottle::clear() { Mutex::Locker l(lock); for (ceph::unordered_map >::iterator i = pending_wbs.begin(); i != pending_wbs.end(); ++i) { #ifdef HAVE_POSIX_FADVISE if (cct->_conf->filestore_fadvise && i->second.first.nocache) { int fa_r = posix_fadvise(**i->second.second, 0, 0, POSIX_FADV_DONTNEED); ceph_assert(fa_r == 0); } #endif } cur_ios = cur_size = 0; logger->set(l_wbthrottle_ios_dirtied, 0); logger->set(l_wbthrottle_bytes_dirtied, 0); logger->set(l_wbthrottle_inodes_dirtied, 0); pending_wbs.clear(); lru.clear(); rev_lru.clear(); cond.Signal(); } void WBThrottle::clear_object(const ghobject_t &hoid) { Mutex::Locker l(lock); while (clearing == hoid) cond.Wait(lock); ceph::unordered_map >::iterator i = pending_wbs.find(hoid); if (i == pending_wbs.end()) return; cur_ios -= i->second.first.ios; logger->dec(l_wbthrottle_ios_dirtied, i->second.first.ios); cur_size -= i->second.first.size; logger->dec(l_wbthrottle_bytes_dirtied, i->second.first.size); logger->dec(l_wbthrottle_inodes_dirtied); pending_wbs.erase(i); remove_object(hoid); cond.Signal(); } void WBThrottle::throttle() { Mutex::Locker l(lock); while (!stopping && need_flush()) cond.Wait(lock); }