diff options
Diffstat (limited to 'src/blk/kernel')
-rw-r--r-- | src/blk/kernel/KernelDevice.cc | 1449 | ||||
-rw-r--r-- | src/blk/kernel/KernelDevice.h | 156 | ||||
-rw-r--r-- | src/blk/kernel/io_uring.cc | 264 | ||||
-rw-r--r-- | src/blk/kernel/io_uring.h | 33 |
4 files changed, 1902 insertions, 0 deletions
diff --git a/src/blk/kernel/KernelDevice.cc b/src/blk/kernel/KernelDevice.cc new file mode 100644 index 000000000..754b44d32 --- /dev/null +++ b/src/blk/kernel/KernelDevice.cc @@ -0,0 +1,1449 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include <limits> +#include <unistd.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/file.h> +#include <sys/mman.h> + +#include <boost/container/flat_map.hpp> +#include <boost/lockfree/queue.hpp> + +#include "KernelDevice.h" +#include "include/buffer_raw.h" +#include "include/intarith.h" +#include "include/types.h" +#include "include/compat.h" +#include "include/stringify.h" +#include "include/str_map.h" +#include "common/blkdev.h" +#include "common/buffer_instrumentation.h" +#include "common/errno.h" +#if defined(__FreeBSD__) +#include "bsm/audit_errno.h" +#endif +#include "common/debug.h" +#include "common/numa.h" + +#include "global/global_context.h" +#include "io_uring.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_bdev +#undef dout_prefix +#define dout_prefix *_dout << "bdev(" << this << " " << path << ") " + +using std::list; +using std::map; +using std::string; +using std::vector; + +using ceph::bufferlist; +using ceph::bufferptr; +using ceph::make_timespan; +using ceph::mono_clock; +using ceph::operator <<; + +KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv) + : BlockDevice(cct, cb, cbpriv), + aio(false), dio(false), + discard_callback(d_cb), + discard_callback_priv(d_cbpriv), + aio_stop(false), + discard_started(false), + discard_stop(false), + aio_thread(this), + discard_thread(this), + injecting_crash(0) +{ + fd_directs.resize(WRITE_LIFE_MAX, -1); + fd_buffereds.resize(WRITE_LIFE_MAX, -1); + + bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring"); + unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth; + + if (use_ioring && ioring_queue_t::supported()) { + bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri"); + bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll"); + io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll); + } else { + static bool once; + if (use_ioring && !once) { + derr << "WARNING: io_uring API is not supported! Fallback to libaio!" + << dendl; + once = true; + } + io_queue = std::make_unique<aio_queue_t>(iodepth); + } +} + +int KernelDevice::_lock() +{ + // When the block changes, systemd-udevd will open the block, + // read some information and close it. Then a failure occurs here. + // So we need to try again here. + int fd = fd_directs[WRITE_LIFE_NOT_SET]; + dout(10) << __func__ << " fd=" << fd << dendl; + uint64_t nr_tries = 0; + for (;;) { + struct flock fl = { .l_type = F_WRLCK, + .l_whence = SEEK_SET }; + int r = ::fcntl(fd, F_OFD_SETLK, &fl); + if (r < 0) { + if (errno == EINVAL) { + r = ::flock(fd, LOCK_EX | LOCK_NB); + } + } + if (r == 0) { + return 0; + } + if (errno != EAGAIN) { + return -errno; + } + dout(1) << __func__ << " flock busy on " << path << dendl; + if (const uint64_t max_retry = + cct->_conf.get_val<uint64_t>("bdev_flock_retry"); + max_retry > 0 && nr_tries++ == max_retry) { + return -EAGAIN; + } + double retry_interval = + cct->_conf.get_val<double>("bdev_flock_retry_interval"); + std::this_thread::sleep_for(ceph::make_timespan(retry_interval)); + } +} + +int KernelDevice::open(const string& p) +{ + path = p; + int r = 0, i = 0; + dout(1) << __func__ << " path " << path << dendl; + + struct stat statbuf; + bool is_block; + r = stat(path.c_str(), &statbuf); + if (r != 0) { + derr << __func__ << " stat got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + is_block = (statbuf.st_mode & S_IFMT) == S_IFBLK; + for (i = 0; i < WRITE_LIFE_MAX; i++) { + int flags = 0; + if (lock_exclusive && is_block && (i == 0)) { + // If opening block device use O_EXCL flag. It gives us best protection, + // as no other process can overwrite the data for as long as we are running. + // For block devices ::flock is not enough, + // since 2 different inodes with same major/minor can be locked. + // Exclusion by O_EXCL works in containers too. + flags |= O_EXCL; + } + int fd = ::open(path.c_str(), O_RDWR | O_DIRECT | flags); + if (fd < 0) { + r = -errno; + break; + } + fd_directs[i] = fd; + + fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC); + if (fd < 0) { + r = -errno; + break; + } + fd_buffereds[i] = fd; + } + + if (i != WRITE_LIFE_MAX) { + derr << __func__ << " open got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + +#if defined(F_SET_FILE_RW_HINT) + for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) { + if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) { + r = -errno; + break; + } + } + if (i != WRITE_LIFE_MAX) { + enable_wrt = false; + dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl; + } +#endif + + dio = true; + aio = cct->_conf->bdev_aio; + if (!aio) { + ceph_abort_msg("non-aio not supported"); + } + + // disable readahead as it will wreak havoc on our mix of + // directio/aio and buffered io. + r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM); + if (r) { + r = -r; + derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl; + goto out_fail; + } + + if (lock_exclusive) { + // We need to keep soft locking (via flock()) because O_EXCL does not work for regular files. + // This is as good as we can get. Other processes can still overwrite the data, + // but at least we are protected from mounting same device twice in ceph processes. + // We also apply soft locking for block devices, as it populates /proc/locks. (see lslocks) + r = _lock(); + if (r < 0) { + derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r) + << dendl; + goto out_fail; + } + } + + struct stat st; + r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st); + if (r < 0) { + r = -errno; + derr << __func__ << " fstat got " << cpp_strerror(r) << dendl; + goto out_fail; + } + + // Operate as though the block size is 4 KB. The backing file + // blksize doesn't strictly matter except that some file systems may + // require a read/modify/write if we write something smaller than + // it. + block_size = cct->_conf->bdev_block_size; + if (block_size != (unsigned)st.st_blksize) { + dout(1) << __func__ << " backing device/file reports st_blksize " + << st.st_blksize << ", using bdev_block_size " + << block_size << " anyway" << dendl; + } + + + { + BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]); + BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]); + + if (S_ISBLK(st.st_mode)) { + int64_t s; + r = blkdev_direct.get_size(&s); + if (r < 0) { + goto out_fail; + } + size = s; + } else { + size = st.st_size; + } + + char partition[PATH_MAX], devname[PATH_MAX]; + if ((r = blkdev_buffered.partition(partition, PATH_MAX)) || + (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) { + derr << "unable to get device name for " << path << ": " + << cpp_strerror(r) << dendl; + rotational = true; + } else { + dout(20) << __func__ << " devname " << devname << dendl; + rotational = blkdev_buffered.is_rotational(); + support_discard = blkdev_buffered.support_discard(); + optimal_io_size = blkdev_buffered.get_optimal_io_size(); + this->devname = devname; + // check if any extended block device plugin recognizes this device + // detect_vdo has moved into the VDO plugin + int rc = extblkdev::detect_device(cct, devname, ebd_impl); + if (rc != 0) { + dout(20) << __func__ << " no plugin volume maps to " << devname << dendl; + } + } + } + + r = _post_open(); + if (r < 0) { + goto out_fail; + } + + r = _aio_start(); + if (r < 0) { + goto out_fail; + } + if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) { + _discard_start(); + } + + // round size down to an even block + size &= ~(block_size - 1); + + dout(1) << __func__ + << " size " << size + << " (0x" << std::hex << size << std::dec << ", " + << byte_u_t(size) << ")" + << " block_size " << block_size + << " (" << byte_u_t(block_size) << ")" + << " " << (rotational ? "rotational device," : "non-rotational device,") + << " discard " << (support_discard ? "supported" : "not supported") + << dendl; + return 0; + +out_fail: + for (i = 0; i < WRITE_LIFE_MAX; i++) { + if (fd_directs[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + } else { + break; + } + if (fd_buffereds[i] >= 0) { + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } else { + break; + } + } + return r; +} + +int KernelDevice::get_devices(std::set<std::string> *ls) const +{ + if (devname.empty()) { + return 0; + } + get_raw_devices(devname, ls); + return 0; +} + +void KernelDevice::close() +{ + dout(1) << __func__ << dendl; + _aio_stop(); + if (discard_thread.is_started()) { + _discard_stop(); + } + _pre_close(); + + extblkdev::release_device(ebd_impl); + + for (int i = 0; i < WRITE_LIFE_MAX; i++) { + assert(fd_directs[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i])); + fd_directs[i] = -1; + + assert(fd_buffereds[i] >= 0); + VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i])); + fd_buffereds[i] = -1; + } + path.clear(); +} + +int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const +{ + (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard); + (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational); + (*pm)[prefix + "size"] = stringify(get_size()); + (*pm)[prefix + "block_size"] = stringify(get_block_size()); + (*pm)[prefix + "optimal_io_size"] = stringify(get_optimal_io_size()); + (*pm)[prefix + "driver"] = "KernelDevice"; + if (rotational) { + (*pm)[prefix + "type"] = "hdd"; + } else { + (*pm)[prefix + "type"] = "ssd"; + } + // if compression device detected, collect meta data for device + // VDO specific meta data has moved into VDO plugin + if (ebd_impl) { + ebd_impl->collect_metadata(prefix, pm); + } + + { + string res_names; + std::set<std::string> devnames; + if (get_devices(&devnames) == 0) { + for (auto& dev : devnames) { + if (!res_names.empty()) { + res_names += ","; + } + res_names += dev; + } + if (res_names.size()) { + (*pm)[prefix + "devices"] = res_names; + } + } + } + + struct stat st; + int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st); + if (r < 0) + return -errno; + if (S_ISBLK(st.st_mode)) { + (*pm)[prefix + "access_mode"] = "blk"; + + char buffer[1024] = {0}; + BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]}; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "partition_path"] = "unknown"; + } else { + (*pm)[prefix + "partition_path"] = buffer; + } + buffer[0] = '\0'; + if (r = blkdev.partition(buffer, sizeof(buffer)); r) { + (*pm)[prefix + "dev_node"] = "unknown"; + } else { + (*pm)[prefix + "dev_node"] = buffer; + } + if (!r) { + return 0; + } + buffer[0] = '\0'; + blkdev.model(buffer, sizeof(buffer)); + (*pm)[prefix + "model"] = buffer; + + buffer[0] = '\0'; + blkdev.dev(buffer, sizeof(buffer)); + (*pm)[prefix + "dev"] = buffer; + + // nvme exposes a serial number + buffer[0] = '\0'; + blkdev.serial(buffer, sizeof(buffer)); + (*pm)[prefix + "serial"] = buffer; + + // numa + int node; + r = blkdev.get_numa_node(&node); + if (r >= 0) { + (*pm)[prefix + "numa_node"] = stringify(node); + } + } else { + (*pm)[prefix + "access_mode"] = "file"; + (*pm)[prefix + "path"] = path; + } + return 0; +} + +int KernelDevice::get_ebd_state(ExtBlkDevState &state) const +{ + // use compression driver plugin to determine physical size and availability + // VDO specific get_thin_utilization has moved into VDO plugin + if (ebd_impl) { + return ebd_impl->get_state(state); + } + return -ENOENT; +} + +int KernelDevice::choose_fd(bool buffered, int write_hint) const +{ +#if defined(F_SET_FILE_RW_HINT) + if (!enable_wrt) + write_hint = WRITE_LIFE_NOT_SET; +#else + // Without WRITE_LIFE capabilities, only one file is used. + // And rocksdb sets this value also to > 0, so we need to catch this here + // instead of trusting rocksdb to set write_hint. + write_hint = WRITE_LIFE_NOT_SET; +#endif + return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint]; +} + +int KernelDevice::flush() +{ + // protect flush with a mutex. note that we are not really protecting + // data here. instead, we're ensuring that if any flush() caller + // sees that io_since_flush is true, they block any racing callers + // until the flush is observed. that allows racing threads to be + // calling flush while still ensuring that *any* of them that got an + // aio completion notification will not return before that aio is + // stable on disk: whichever thread sees the flag first will block + // followers until the aio is stable. + std::lock_guard l(flush_mutex); + + bool expect = true; + if (!io_since_flush.compare_exchange_strong(expect, false)) { + dout(10) << __func__ << " no-op (no ios since last flush), flag is " + << (int)io_since_flush.load() << dendl; + return 0; + } + + dout(10) << __func__ << " start" << dendl; + if (cct->_conf->bdev_inject_crash) { + ++injecting_crash; + // sleep for a moment to give other threads a chance to submit or + // wait on io that races with a flush. + derr << __func__ << " injecting crash. first we sleep..." << dendl; + sleep(cct->_conf->bdev_inject_crash_flush_delay); + derr << __func__ << " and now we die" << dendl; + cct->_log->flush(); + _exit(1); + } + utime_t start = ceph_clock_now(); + int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]); + utime_t end = ceph_clock_now(); + utime_t dur = end - start; + if (r < 0) { + r = -errno; + derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl; + ceph_abort(); + } + dout(5) << __func__ << " in " << dur << dendl;; + return r; +} + +int KernelDevice::_aio_start() +{ + if (aio) { + dout(10) << __func__ << dendl; + int r = io_queue->init(fd_directs); + if (r < 0) { + if (r == -EAGAIN) { + derr << __func__ << " io_setup(2) failed with EAGAIN; " + << "try increasing /proc/sys/fs/aio-max-nr" << dendl; + } else { + derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl; + } + return r; + } + aio_thread.create("bstore_aio"); + } + return 0; +} + +void KernelDevice::_aio_stop() +{ + if (aio) { + dout(10) << __func__ << dendl; + aio_stop = true; + aio_thread.join(); + aio_stop = false; + io_queue->shutdown(); + } +} + +void KernelDevice::_discard_start() +{ + discard_thread.create("bstore_discard"); +} + +void KernelDevice::_discard_stop() +{ + dout(10) << __func__ << dendl; + { + std::unique_lock l(discard_lock); + while (!discard_started) { + discard_cond.wait(l); + } + discard_stop = true; + discard_cond.notify_all(); + } + discard_thread.join(); + { + std::lock_guard l(discard_lock); + discard_stop = false; + } + dout(10) << __func__ << " stopped" << dendl; +} + +void KernelDevice::discard_drain() +{ + dout(10) << __func__ << dendl; + std::unique_lock l(discard_lock); + while (!discard_queued.empty() || discard_running) { + discard_cond.wait(l); + } +} + +static bool is_expected_ioerr(const int r) +{ + // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135 + return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC || + r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO || + r == -ENODATA || r == -EILSEQ || r == -ENOMEM || +#if defined(__linux__) + r == -EREMCHG || r == -EBADE +#elif defined(__FreeBSD__) + r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE +#endif + ); +} + +void KernelDevice::_aio_thread() +{ + dout(10) << __func__ << " start" << dendl; + int inject_crash_count = 0; + while (!aio_stop) { + dout(40) << __func__ << " polling" << dendl; + int max = cct->_conf->bdev_aio_reap_max; + aio_t *aio[max]; + int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms, + aio, max); + if (r < 0) { + derr << __func__ << " got " << cpp_strerror(r) << dendl; + ceph_abort_msg("got unexpected error from io_getevents"); + } + if (r > 0) { + dout(30) << __func__ << " got " << r << " completed aios" << dendl; + for (int i = 0; i < r; ++i) { + IOContext *ioc = static_cast<IOContext*>(aio[i]->priv); + _aio_log_finish(ioc, aio[i]->offset, aio[i]->length); + if (aio[i]->queue_item.is_linked()) { + std::lock_guard l(debug_queue_lock); + debug_aio_unlink(*aio[i]); + } + + // set flag indicating new ios have completed. we do this *before* + // any completion or notifications so that any user flush() that + // follows the observed io completion will include this io. Note + // that an earlier, racing flush() could observe and clear this + // flag, but that also ensures that the IO will be stable before the + // later flush() occurs. + io_since_flush.store(true); + + long r = aio[i]->get_return_value(); + if (r < 0) { + derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")" + << dendl; + if (ioc->allow_eio && is_expected_ioerr(r)) { + derr << __func__ << " translating the error to EIO for upper layer" + << dendl; + ioc->set_return_value(-EIO); + } else { + if (is_expected_ioerr(r)) { + note_io_error_event( + devname.c_str(), + path.c_str(), + r, +#if defined(HAVE_POSIXAIO) + aio[i]->aio.aiocb.aio_lio_opcode, +#else + aio[i]->iocb.aio_lio_opcode, +#endif + aio[i]->offset, + aio[i]->length); + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest a hardware issue. " + "Please check your kernel log!"); + } + ceph_abort_msg( + "Unexpected IO error. " + "This may suggest HW issue. Please check your dmesg!"); + } + } else if (aio[i]->length != (uint64_t)r) { + derr << "aio to 0x" << std::hex << aio[i]->offset + << "~" << aio[i]->length << std::dec + << " but returned: " << r << dendl; + ceph_abort_msg("unexpected aio return value: does not match length"); + } + + dout(10) << __func__ << " finished aio " << aio[i] << " r " << r + << " ioc " << ioc + << " with " << (ioc->num_running.load() - 1) + << " aios left" << dendl; + + // NOTE: once num_running and we either call the callback or + // call aio_wake we cannot touch ioc or aio[] as the caller + // may free it. + if (ioc->priv) { + if (--ioc->num_running == 0) { + aio_callback(aio_callback_priv, ioc->priv); + } + } else { + ioc->try_aio_wake(); + } + } + } + if (cct->_conf->bdev_debug_aio) { + utime_t now = ceph_clock_now(); + std::lock_guard l(debug_queue_lock); + if (debug_oldest) { + if (debug_stall_since == utime_t()) { + debug_stall_since = now; + } else { + if (cct->_conf->bdev_debug_aio_suicide_timeout) { + utime_t cutoff = now; + cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << cct->_conf->bdev_debug_aio_suicide_timeout + << "s, suicide" << dendl; + ceph_abort_msg("stalled aio... buggy kernel or bad device?"); + } + } + } + } + } + if (cct->_conf->bdev_inject_crash) { + ++inject_crash_count; + if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 > + cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) { + derr << __func__ << " bdev_inject_crash trigger from aio thread" + << dendl; + cct->_log->flush(); + _exit(1); + } + } + } + dout(10) << __func__ << " end" << dendl; +} + +void KernelDevice::_discard_thread() +{ + std::unique_lock l(discard_lock); + ceph_assert(!discard_started); + discard_started = true; + discard_cond.notify_all(); + while (true) { + ceph_assert(discard_finishing.empty()); + if (discard_queued.empty()) { + if (discard_stop) + break; + dout(20) << __func__ << " sleep" << dendl; + discard_cond.notify_all(); // for the thread trying to drain... + discard_cond.wait(l); + dout(20) << __func__ << " wake" << dendl; + } else { + discard_finishing.swap(discard_queued); + discard_running = true; + l.unlock(); + dout(20) << __func__ << " finishing" << dendl; + for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) { + _discard(p.get_start(), p.get_len()); + } + + discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing)); + discard_finishing.clear(); + l.lock(); + discard_running = false; + } + } + dout(10) << __func__ << " finish" << dendl; + discard_started = false; +} + +int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release) +{ + // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard + if (!discard_thread.is_started()) + return -1; + + if (to_release.empty()) + return 0; + + std::lock_guard l(discard_lock); + discard_queued.insert(to_release); + discard_cond.notify_all(); + return 0; +} + +// return true only if _queue_discard succeeded, so caller won't have to do alloc->release +// otherwise false +bool KernelDevice::try_discard(interval_set<uint64_t> &to_release, bool async) +{ + if (!support_discard || !cct->_conf->bdev_enable_discard) + return false; + + if (async && discard_thread.is_started()) { + return 0 == _queue_discard(to_release); + } else { + for (auto p = to_release.begin(); p != to_release.end(); ++p) { + _discard(p.get_start(), p.get_len()); + } + } + return false; +} + +void KernelDevice::_aio_log_start( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length + << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + if (debug_inflight.intersects(offset, length)) { + derr << __func__ << " inflight overlap of 0x" + << std::hex + << offset << "~" << length << std::dec + << " with " << debug_inflight << dendl; + ceph_abort(); + } + debug_inflight.insert(offset, length); + } +} + +void KernelDevice::debug_aio_link(aio_t& aio) +{ + if (debug_queue.empty()) { + debug_oldest = &aio; + } + debug_queue.push_back(aio); +} + +void KernelDevice::debug_aio_unlink(aio_t& aio) +{ + if (aio.queue_item.is_linked()) { + debug_queue.erase(debug_queue.iterator_to(aio)); + if (debug_oldest == &aio) { + auto age = cct->_conf->bdev_debug_aio_log_age; + if (age && debug_stall_since != utime_t()) { + utime_t cutoff = ceph_clock_now(); + cutoff -= age; + if (debug_stall_since < cutoff) { + derr << __func__ << " stalled aio " << debug_oldest + << " since " << debug_stall_since << ", timeout is " + << age + << "s" << dendl; + } + } + + if (debug_queue.empty()) { + debug_oldest = nullptr; + } else { + debug_oldest = &debug_queue.front(); + } + debug_stall_since = utime_t(); + } + } +} + +void KernelDevice::_aio_log_finish( + IOContext *ioc, + uint64_t offset, + uint64_t length) +{ + dout(20) << __func__ << " " << aio << " 0x" + << std::hex << offset << "~" << length << std::dec << dendl; + if (cct->_conf->bdev_debug_inflight_ios) { + std::lock_guard l(debug_lock); + debug_inflight.erase(offset, length); + } +} + +void KernelDevice::aio_submit(IOContext *ioc) +{ + dout(20) << __func__ << " ioc " << ioc + << " pending " << ioc->num_pending.load() + << " running " << ioc->num_running.load() + << dendl; + + if (ioc->num_pending.load() == 0) { + return; + } + + // move these aside, and get our end iterator position now, as the + // aios might complete as soon as they are submitted and queue more + // wal aio's. + list<aio_t>::iterator e = ioc->running_aios.begin(); + ioc->running_aios.splice(e, ioc->pending_aios); + + int pending = ioc->num_pending.load(); + ioc->num_running += pending; + ioc->num_pending -= pending; + ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this + ceph_assert(ioc->pending_aios.size() == 0); + + if (cct->_conf->bdev_debug_aio) { + list<aio_t>::iterator p = ioc->running_aios.begin(); + while (p != e) { + dout(30) << __func__ << " " << *p << dendl; + std::lock_guard l(debug_queue_lock); + debug_aio_link(*p++); + } + } + + void *priv = static_cast<void*>(ioc); + int r, retries = 0; + // num of pending aios should not overflow when passed to submit_batch() + assert(pending <= std::numeric_limits<uint16_t>::max()); + r = io_queue->submit_batch(ioc->running_aios.begin(), e, + pending, priv, &retries); + + if (retries) + derr << __func__ << " retries " << retries << dendl; + if (r < 0) { + derr << " aio submit got " << cpp_strerror(r) << dendl; + ceph_assert(r == 0); + } +} + +int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint) +{ + uint64_t len = bl.length(); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " " << buffermode(buffered) << dendl; + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec << dendl; + ++injecting_crash; + return 0; + } + vector<iovec> iov; + bl.prepare_iov(&iov); + + auto left = len; + auto o = off; + size_t idx = 0; + do { + auto r = ::pwritev(choose_fd(buffered, write_hint), + &iov[idx], iov.size() - idx, o); + + if (r < 0) { + r = -errno; + derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl; + return r; + } + o += r; + left -= r; + if (left) { + // skip fully processed IOVs + while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) { + r -= iov[idx++].iov_len; + } + // update partially processed one if any + if (r) { + ceph_assert(idx < iov.size()); + ceph_assert((size_t)r < iov[idx].iov_len); + iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r; + iov[idx].iov_len -= r; + r = 0; + } + ceph_assert(r == 0); + } + } while (left); + +#ifdef HAVE_SYNC_FILE_RANGE + if (buffered) { + // initiate IO and wait till it completes + auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE); + if (r < 0) { + r = -errno; + derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl; + return r; + } + } +#endif + + io_since_flush.store(true); + + return 0; +} + +int KernelDevice::write( + uint64_t off, + bufferlist &bl, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " " << buffermode(buffered) + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + + return _sync_write(off, bl, buffered, write_hint); +} + +int KernelDevice::aio_write( + uint64_t off, + bufferlist &bl, + IOContext *ioc, + bool buffered, + int write_hint) +{ + uint64_t len = bl.length(); + dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " " << buffermode(buffered) + << dendl; + ceph_assert(is_valid_io(off, len)); + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + + if ((!buffered || bl.get_num_buffers() >= IOV_MAX) && + bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) { + dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl; + } + dout(40) << "data:\n"; + bl.hexdump(*_dout); + *_dout << dendl; + + _aio_log_start(ioc, off, len); + +#ifdef HAVE_LIBAIO + if (aio && dio && !buffered) { + if (cct->_conf->bdev_inject_crash && + rand() % cct->_conf->bdev_inject_crash == 0) { + derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex + << off << "~" << len << std::dec + << dendl; + // generate a real io so that aio_wait behaves properly, but make it + // a read instead of write, and toss the result. + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + aio.bl.push_back( + ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len))); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + ++injecting_crash; + } else { + if (bl.length() <= RW_IO_MAX) { + // fast path (non-huge write) + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + bl.prepare_iov(&aio.iov); + aio.bl.claim_append(bl); + aio.pwritev(off, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else { + // write in RW_IO_MAX-sized chunks + uint64_t prev_len = 0; + while (prev_len < bl.length()) { + bufferlist tmp; + if (prev_len + RW_IO_MAX < bl.length()) { + tmp.substr_of(bl, prev_len, RW_IO_MAX); + } else { + tmp.substr_of(bl, prev_len, bl.length() - prev_len); + } + auto len = tmp.length(); + ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint))); + ++ioc->num_pending; + auto& aio = ioc->pending_aios.back(); + tmp.prepare_iov(&aio.iov); + aio.bl.claim_append(tmp); + aio.pwritev(off + prev_len, len); + dout(30) << aio << dendl; + dout(5) << __func__ << " 0x" << std::hex << off + prev_len + << "~" << len + << std::dec << " aio " << &aio << " (piece)" << dendl; + prev_len += len; + } + } + } + } else +#endif + { + int r = _sync_write(off, bl, buffered, write_hint); + _aio_log_finish(ioc, off, len); + if (r < 0) + return r; + } + return 0; +} + +int KernelDevice::_discard(uint64_t offset, uint64_t len) +{ + int r = 0; + if (cct->_conf->objectstore_blackhole) { + lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO" + << dendl; + return 0; + } + dout(10) << __func__ + << " 0x" << std::hex << offset << "~" << len << std::dec + << dendl; + r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len); + return r; +} + +struct ExplicitHugePagePool { + using region_queue_t = boost::lockfree::queue<void*>; + using instrumented_raw = ceph::buffer_instrumentation::instrumented_raw< + BlockDevice::hugepaged_raw_marker_t>; + + struct mmaped_buffer_raw : public instrumented_raw { + region_queue_t& region_q; // for recycling + + mmaped_buffer_raw(void* mmaped_region, ExplicitHugePagePool& parent) + : instrumented_raw(static_cast<char*>(mmaped_region), parent.buffer_size), + region_q(parent.region_q) { + // the `mmaped_region` has been passed to `raw` as the buffer's `data` + } + ~mmaped_buffer_raw() override { + // don't delete nor unmmap; recycle the region instead + region_q.push(data); + } + }; + + ExplicitHugePagePool(const size_t buffer_size, size_t buffers_in_pool) + : buffer_size(buffer_size), region_q(buffers_in_pool) { + while (buffers_in_pool--) { + void* const mmaped_region = ::mmap( + nullptr, + buffer_size, + PROT_READ | PROT_WRITE, +#if defined(__FreeBSD__) + // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has + // a different, more automated / implicit mechanisms. However, + // we want to mimic the Linux behavior as closely as possible + // also in the matter of error handling which is the reason + // behind MAP_ALIGNED_SUPER. + // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html + MAP_PRIVATE | MAP_ANONYMOUS | MAP_PREFAULT_READ | MAP_ALIGNED_SUPER, +#else + MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB, +#endif // __FreeBSD__ + -1, + 0); + if (mmaped_region == MAP_FAILED) { + ceph_abort("can't allocate huge buffer;" + " /proc/sys/vm/nr_hugepages misconfigured?"); + } else { + region_q.push(mmaped_region); + } + } + } + ~ExplicitHugePagePool() { + void* mmaped_region; + while (region_q.pop(mmaped_region)) { + ::munmap(mmaped_region, buffer_size); + } + } + + ceph::unique_leakable_ptr<buffer::raw> try_create() { + if (void* mmaped_region; region_q.pop(mmaped_region)) { + return ceph::unique_leakable_ptr<buffer::raw> { + new mmaped_buffer_raw(mmaped_region, *this) + }; + } else { + // oops, empty queue. + return nullptr; + } + } + + size_t get_buffer_size() const { + return buffer_size; + } + +private: + const size_t buffer_size; + region_queue_t region_q; +}; + +struct HugePagePoolOfPools { + HugePagePoolOfPools(const std::map<size_t, size_t> conf) + : pools(conf.size(), [conf] (size_t index, auto emplacer) { + ceph_assert(index < conf.size()); + // it could be replaced with a state-mutating lambda and + // `conf::erase()` but performance is not a concern here. + const auto [buffer_size, buffers_in_pool] = + *std::next(std::begin(conf), index); + emplacer.emplace(buffer_size, buffers_in_pool); + }) { + } + + ceph::unique_leakable_ptr<buffer::raw> try_create(const size_t size) { + // thankfully to `conf` being a `std::map` we store the pools + // sorted by buffer sizes. this would allow to clamp to log(n) + // but I doubt admins want to have dozens of accelerated buffer + // size. let's keep this simple for now. + if (auto iter = std::find_if(std::begin(pools), std::end(pools), + [size] (const auto& pool) { + return size == pool.get_buffer_size(); + }); + iter != std::end(pools)) { + return iter->try_create(); + } + return nullptr; + } + + static HugePagePoolOfPools from_desc(const std::string& conf); + +private: + // let's have some space inside (for 2 MB and 4 MB perhaps?) + // NOTE: we need tiny_vector as the boost::lockfree queue inside + // pool is not-movable. + ceph::containers::tiny_vector<ExplicitHugePagePool, 2> pools; +}; + + +HugePagePoolOfPools HugePagePoolOfPools::from_desc(const std::string& desc) { + std::map<size_t, size_t> conf; // buffer_size -> buffers_in_pool + std::map<std::string, std::string> exploded_str_conf; + get_str_map(desc, &exploded_str_conf); + for (const auto& [buffer_size_s, buffers_in_pool_s] : exploded_str_conf) { + size_t buffer_size, buffers_in_pool; + if (sscanf(buffer_size_s.c_str(), "%zu", &buffer_size) != 1) { + ceph_abort("can't parse a key in the configuration"); + } + if (sscanf(buffers_in_pool_s.c_str(), "%zu", &buffers_in_pool) != 1) { + ceph_abort("can't parse a value in the configuration"); + } + conf[buffer_size] = buffers_in_pool; + } + return HugePagePoolOfPools{std::move(conf)}; +} + +// create a buffer basing on user-configurable. it's intended to make +// our buffers THP-able. +ceph::unique_leakable_ptr<buffer::raw> KernelDevice::create_custom_aligned( + const size_t len, + IOContext* const ioc) const +{ + // just to preserve the logic of create_small_page_aligned(). + if (len < CEPH_PAGE_SIZE) { + return ceph::buffer::create_small_page_aligned(len); + } else { + static HugePagePoolOfPools hp_pools = HugePagePoolOfPools::from_desc( + cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers") + ); + if (auto lucky_raw = hp_pools.try_create(len); lucky_raw) { + dout(20) << __func__ << " allocated from huge pool" + << " lucky_raw.data=" << (void*)lucky_raw->get_data() + << " bdev_read_preallocated_huge_buffers=" + << cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers") + << dendl; + ioc->flags |= IOContext::FLAG_DONT_CACHE; + return lucky_raw; + } else { + // fallthrough due to empty buffer pool. this can happen also + // when the configurable was explicitly set to 0. + dout(20) << __func__ << " cannot allocate from huge pool" + << dendl; + } + } + const size_t custom_alignment = cct->_conf->bdev_read_buffer_alignment; + dout(20) << __func__ << " with the custom alignment;" + << " len=" << len + << " custom_alignment=" << custom_alignment + << dendl; + return ceph::buffer::create_aligned(len, custom_alignment); +} + +int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, + IOContext *ioc, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " " << buffermode(buffered) + << dendl; + ceph_assert(is_valid_io(off, len)); + + _aio_log_start(ioc, off, len); + + auto start1 = mono_clock::now(); + + auto p = ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc)); + int r = ::pread(choose_fd(buffered, WRITE_LIFE_NOT_SET), + p->c_str(), len, off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " " << buffermode(buffered) + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + if (r < 0) { + if (ioc->allow_eio && is_expected_ioerr(-errno)) { + r = -EIO; + } else { + r = -errno; + } + derr << __func__ << " 0x" << std::hex << off << "~" << std::left + << std::dec << " error: " << cpp_strerror(r) << dendl; + goto out; + } + ceph_assert((uint64_t)r == len); + pbl->push_back(std::move(p)); + + dout(40) << "data:\n"; + pbl->hexdump(*_dout); + *_dout << dendl; + + out: + _aio_log_finish(ioc, off, len); + return r < 0 ? r : 0; +} + +int KernelDevice::aio_read( + uint64_t off, + uint64_t len, + bufferlist *pbl, + IOContext *ioc) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + + int r = 0; +#ifdef HAVE_LIBAIO + if (aio && dio) { + ceph_assert(is_valid_io(off, len)); + _aio_log_start(ioc, off, len); + ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET])); + ++ioc->num_pending; + aio_t& aio = ioc->pending_aios.back(); + aio.bl.push_back( + ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc))); + aio.bl.prepare_iov(&aio.iov); + aio.preadv(off, len); + dout(30) << aio << dendl; + pbl->append(aio.bl); + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len + << std::dec << " aio " << &aio << dendl; + } else +#endif + { + r = read(off, len, pbl, ioc, false); + } + + return r; +} + +int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf) +{ + uint64_t aligned_off = p2align(off, block_size); + uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off; + bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len); + int r = 0; + + auto start1 = mono_clock::now(); + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off); + auto age = cct->_conf->bdev_debug_aio_log_age; + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + goto out; + } + ceph_assert((uint64_t)r == aligned_len); + memcpy(buf, p.c_str() + (off - aligned_off), len); + + dout(40) << __func__ << " data:\n"; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf, + bool buffered) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << "buffered " << buffered + << dendl; + ceph_assert(len > 0); + ceph_assert(off < size); + ceph_assert(off + len <= size); + int r = 0; + auto age = cct->_conf->bdev_debug_aio_log_age; + + //if it's direct io and unaligned, we have to use a internal buffer + if (!buffered && ((off % block_size != 0) + || (len % block_size != 0) + || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0))) + return direct_read_unaligned(off, len, buf); + + auto start1 = mono_clock::now(); + if (buffered) { + //buffered read + auto off0 = off; + char *t = buf; + uint64_t left = len; + while (left > 0) { + r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off); + if (r < 0) { + r = -errno; + derr << __func__ << " 0x" << std::hex << off << "~" << left + << std::dec << " error: " << cpp_strerror(r) << dendl; + goto out; + } + off += r; + t += r; + left -= r; + } + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off0 << "~" << len << std::dec + << " (buffered) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + } else { + //direct and aligned read + r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off); + if (mono_clock::now() - start1 >= make_timespan(age)) { + derr << __func__ << " stalled read " + << " 0x" << std::hex << off << "~" << len << std::dec + << " (direct) since " << start1 << ", timeout is " + << age + << "s" << dendl; + } + if (r < 0) { + r = -errno; + derr << __func__ << " direct_aligned_read" << " 0x" << std::hex + << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r) + << dendl; + goto out; + } + ceph_assert((uint64_t)r == len); + } + + dout(40) << __func__ << " data:\n"; + bufferlist bl; + bl.append(buf, len); + bl.hexdump(*_dout); + *_dout << dendl; + + out: + return r < 0 ? r : 0; +} + +int KernelDevice::invalidate_cache(uint64_t off, uint64_t len) +{ + dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << dendl; + ceph_assert(off % block_size == 0); + ceph_assert(len % block_size == 0); + int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED); + if (r) { + r = -r; + derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec + << " error: " << cpp_strerror(r) << dendl; + } + return r; +} diff --git a/src/blk/kernel/KernelDevice.h b/src/blk/kernel/KernelDevice.h new file mode 100644 index 000000000..e00e31f10 --- /dev/null +++ b/src/blk/kernel/KernelDevice.h @@ -0,0 +1,156 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2014 Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_BLK_KERNELDEVICE_H +#define CEPH_BLK_KERNELDEVICE_H + +#include <atomic> + +#include "include/types.h" +#include "include/interval_set.h" +#include "common/Thread.h" +#include "include/utime.h" + +#include "aio/aio.h" +#include "BlockDevice.h" +#include "extblkdev/ExtBlkDevPlugin.h" + +#define RW_IO_MAX (INT_MAX & CEPH_PAGE_MASK) + +class KernelDevice : public BlockDevice { +protected: + std::string path; +private: + std::vector<int> fd_directs, fd_buffereds; + bool enable_wrt = true; + bool aio, dio; + + ExtBlkDevInterfaceRef ebd_impl; // structure for retrieving compression state from extended block device + + std::string devname; ///< kernel dev name (/sys/block/$devname), if any + + ceph::mutex debug_lock = ceph::make_mutex("KernelDevice::debug_lock"); + interval_set<uint64_t> debug_inflight; + + std::atomic<bool> io_since_flush = {false}; + ceph::mutex flush_mutex = ceph::make_mutex("KernelDevice::flush_mutex"); + + std::unique_ptr<io_queue_t> io_queue; + aio_callback_t discard_callback; + void *discard_callback_priv; + bool aio_stop; + bool discard_started; + bool discard_stop; + + ceph::mutex discard_lock = ceph::make_mutex("KernelDevice::discard_lock"); + ceph::condition_variable discard_cond; + bool discard_running = false; + interval_set<uint64_t> discard_queued; + interval_set<uint64_t> discard_finishing; + + struct AioCompletionThread : public Thread { + KernelDevice *bdev; + explicit AioCompletionThread(KernelDevice *b) : bdev(b) {} + void *entry() override { + bdev->_aio_thread(); + return NULL; + } + } aio_thread; + + struct DiscardThread : public Thread { + KernelDevice *bdev; + explicit DiscardThread(KernelDevice *b) : bdev(b) {} + void *entry() override { + bdev->_discard_thread(); + return NULL; + } + } discard_thread; + + std::atomic_int injecting_crash; + + virtual int _post_open() { return 0; } // hook for child implementations + virtual void _pre_close() { } // hook for child implementations + + void _aio_thread(); + void _discard_thread(); + int _queue_discard(interval_set<uint64_t> &to_release); + bool try_discard(interval_set<uint64_t> &to_release, bool async = true) override; + + int _aio_start(); + void _aio_stop(); + + void _discard_start(); + void _discard_stop(); + + void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length); + void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length); + + int _sync_write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint); + + int _lock(); + + int direct_read_unaligned(uint64_t off, uint64_t len, char *buf); + + // stalled aio debugging + aio_list_t debug_queue; + ceph::mutex debug_queue_lock = ceph::make_mutex("KernelDevice::debug_queue_lock"); + aio_t *debug_oldest = nullptr; + utime_t debug_stall_since; + void debug_aio_link(aio_t& aio); + void debug_aio_unlink(aio_t& aio); + + int choose_fd(bool buffered, int write_hint) const; + + ceph::unique_leakable_ptr<buffer::raw> create_custom_aligned(size_t len, IOContext* ioc) const; + +public: + KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv); + + void aio_submit(IOContext *ioc) override; + void discard_drain() override; + + int collect_metadata(const std::string& prefix, std::map<std::string,std::string> *pm) const override; + int get_devname(std::string *s) const override { + if (devname.empty()) { + return -ENOENT; + } + *s = devname; + return 0; + } + int get_devices(std::set<std::string> *ls) const override; + + int get_ebd_state(ExtBlkDevState &state) const override; + + int read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, + IOContext *ioc, + bool buffered) override; + int aio_read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, + IOContext *ioc) override; + int read_random(uint64_t off, uint64_t len, char *buf, bool buffered) override; + + int write(uint64_t off, ceph::buffer::list& bl, bool buffered, int write_hint = WRITE_LIFE_NOT_SET) override; + int aio_write(uint64_t off, ceph::buffer::list& bl, + IOContext *ioc, + bool buffered, + int write_hint = WRITE_LIFE_NOT_SET) override; + int flush() override; + int _discard(uint64_t offset, uint64_t len); + + // for managing buffered readers/writers + int invalidate_cache(uint64_t off, uint64_t len) override; + int open(const std::string& path) override; + void close() override; +}; + +#endif diff --git a/src/blk/kernel/io_uring.cc b/src/blk/kernel/io_uring.cc new file mode 100644 index 000000000..5e7fd1227 --- /dev/null +++ b/src/blk/kernel/io_uring.cc @@ -0,0 +1,264 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "io_uring.h" + +#if defined(HAVE_LIBURING) + +#include "liburing.h" +#include <sys/epoll.h> + +using std::list; +using std::make_unique; + +struct ioring_data { + struct io_uring io_uring; + pthread_mutex_t cq_mutex; + pthread_mutex_t sq_mutex; + int epoll_fd = -1; + std::map<int, int> fixed_fds_map; +}; + +static int ioring_get_cqe(struct ioring_data *d, unsigned int max, + struct aio_t **paio) +{ + struct io_uring *ring = &d->io_uring; + struct io_uring_cqe *cqe; + + unsigned nr = 0; + unsigned head; + io_uring_for_each_cqe(ring, head, cqe) { + struct aio_t *io = (struct aio_t *)(uintptr_t) io_uring_cqe_get_data(cqe); + io->rval = cqe->res; + + paio[nr++] = io; + + if (nr == max) + break; + } + io_uring_cq_advance(ring, nr); + + return nr; +} + +static int find_fixed_fd(struct ioring_data *d, int real_fd) +{ + auto it = d->fixed_fds_map.find(real_fd); + if (it == d->fixed_fds_map.end()) + return -1; + + return it->second; +} + +static void init_sqe(struct ioring_data *d, struct io_uring_sqe *sqe, + struct aio_t *io) +{ + int fixed_fd = find_fixed_fd(d, io->fd); + + ceph_assert(fixed_fd != -1); + + if (io->iocb.aio_lio_opcode == IO_CMD_PWRITEV) + io_uring_prep_writev(sqe, fixed_fd, &io->iov[0], + io->iov.size(), io->offset); + else if (io->iocb.aio_lio_opcode == IO_CMD_PREADV) + io_uring_prep_readv(sqe, fixed_fd, &io->iov[0], + io->iov.size(), io->offset); + else + ceph_assert(0); + + io_uring_sqe_set_data(sqe, io); + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); +} + +static int ioring_queue(struct ioring_data *d, void *priv, + list<aio_t>::iterator beg, list<aio_t>::iterator end) +{ + struct io_uring *ring = &d->io_uring; + struct aio_t *io = nullptr; + + ceph_assert(beg != end); + + do { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + if (!sqe) + break; + + io = &*beg; + io->priv = priv; + + init_sqe(d, sqe, io); + + } while (++beg != end); + + if (!io) + /* Queue is full, go and reap something first */ + return 0; + + return io_uring_submit(ring); +} + +static void build_fixed_fds_map(struct ioring_data *d, + std::vector<int> &fds) +{ + int fixed_fd = 0; + for (int real_fd : fds) { + d->fixed_fds_map[real_fd] = fixed_fd++; + } +} + +ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) : + d(make_unique<ioring_data>()), + iodepth(iodepth_), + hipri(hipri_), + sq_thread(sq_thread_) +{ +} + +ioring_queue_t::~ioring_queue_t() +{ +} + +int ioring_queue_t::init(std::vector<int> &fds) +{ + unsigned flags = 0; + + pthread_mutex_init(&d->cq_mutex, NULL); + pthread_mutex_init(&d->sq_mutex, NULL); + + if (hipri) + flags |= IORING_SETUP_IOPOLL; + if (sq_thread) + flags |= IORING_SETUP_SQPOLL; + + int ret = io_uring_queue_init(iodepth, &d->io_uring, flags); + if (ret < 0) + return ret; + + ret = io_uring_register_files(&d->io_uring, + &fds[0], fds.size()); + if (ret < 0) { + ret = -errno; + goto close_ring_fd; + } + + build_fixed_fds_map(d.get(), fds); + + d->epoll_fd = epoll_create1(0); + if (d->epoll_fd < 0) { + ret = -errno; + goto close_ring_fd; + } + + struct epoll_event ev; + ev.events = EPOLLIN; + ret = epoll_ctl(d->epoll_fd, EPOLL_CTL_ADD, d->io_uring.ring_fd, &ev); + if (ret < 0) { + ret = -errno; + goto close_epoll_fd; + } + + return 0; + +close_epoll_fd: + close(d->epoll_fd); +close_ring_fd: + io_uring_queue_exit(&d->io_uring); + + return ret; +} + +void ioring_queue_t::shutdown() +{ + d->fixed_fds_map.clear(); + close(d->epoll_fd); + d->epoll_fd = -1; + io_uring_queue_exit(&d->io_uring); +} + +int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + (void)aios_size; + (void)retries; + + pthread_mutex_lock(&d->sq_mutex); + int rc = ioring_queue(d.get(), priv, beg, end); + pthread_mutex_unlock(&d->sq_mutex); + + return rc; +} + +int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ +get_cqe: + pthread_mutex_lock(&d->cq_mutex); + int events = ioring_get_cqe(d.get(), max, paio); + pthread_mutex_unlock(&d->cq_mutex); + + if (events == 0) { + struct epoll_event ev; + int ret = TEMP_FAILURE_RETRY(epoll_wait(d->epoll_fd, &ev, 1, timeout_ms)); + if (ret < 0) + events = -errno; + else if (ret > 0) + /* Time to reap */ + goto get_cqe; + } + + return events; +} + +bool ioring_queue_t::supported() +{ + struct io_uring ring; + int ret = io_uring_queue_init(16, &ring, 0); + if (ret) { + return false; + } + io_uring_queue_exit(&ring); + return true; +} + +#else // #if defined(HAVE_LIBURING) + +struct ioring_data {}; + +ioring_queue_t::ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_) +{ + ceph_assert(0); +} + +ioring_queue_t::~ioring_queue_t() +{ + ceph_assert(0); +} + +int ioring_queue_t::init(std::vector<int> &fds) +{ + ceph_assert(0); +} + +void ioring_queue_t::shutdown() +{ + ceph_assert(0); +} + +int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + ceph_assert(0); +} + +int ioring_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ + ceph_assert(0); +} + +bool ioring_queue_t::supported() +{ + return false; +} + +#endif // #if defined(HAVE_LIBURING) diff --git a/src/blk/kernel/io_uring.h b/src/blk/kernel/io_uring.h new file mode 100644 index 000000000..e7d0acde0 --- /dev/null +++ b/src/blk/kernel/io_uring.h @@ -0,0 +1,33 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "acconfig.h" + +#include "include/types.h" +#include "aio/aio.h" + +struct ioring_data; + +struct ioring_queue_t final : public io_queue_t { + std::unique_ptr<ioring_data> d; + unsigned iodepth = 0; + bool hipri = false; + bool sq_thread = false; + + typedef std::list<aio_t>::iterator aio_iter; + + // Returns true if arch is x86-64 and kernel supports io_uring + static bool supported(); + + ioring_queue_t(unsigned iodepth_, bool hipri_, bool sq_thread_); + ~ioring_queue_t() final; + + int init(std::vector<int> &fds) final; + void shutdown() final; + + int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) final; + int get_next_completed(int timeout_ms, aio_t **paio, int max) final; +}; |