From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- src/blk/aio/aio.cc | 124 +++++++++++++++++++++++++++++++++++++++++ src/blk/aio/aio.h | 159 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 src/blk/aio/aio.cc create mode 100644 src/blk/aio/aio.h (limited to 'src/blk/aio') diff --git a/src/blk/aio/aio.cc b/src/blk/aio/aio.cc new file mode 100644 index 000000000..00a12bfd1 --- /dev/null +++ b/src/blk/aio/aio.cc @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include +#include "aio.h" + +std::ostream& operator<<(std::ostream& os, const aio_t& aio) +{ + unsigned i = 0; + os << "aio: "; + for (auto& iov : aio.iov) { + os << "\n [" << i++ << "] 0x" + << std::hex << iov.iov_base << "~" << iov.iov_len << std::dec; + } + return os; +} + +int aio_queue_t::submit_batch(aio_iter begin, aio_iter end, + uint16_t aios_size, void *priv, + int *retries) +{ + // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds + int attempts = 16; + int delay = 125; + int r; + + aio_iter cur = begin; + struct aio_t *piocb[aios_size]; + int left = 0; + while (cur != end) { + cur->priv = priv; + *(piocb+left) = &(*cur); + ++left; + ++cur; + } + ceph_assert(aios_size >= left); + int done = 0; + while (left > 0) { +#if defined(HAVE_LIBAIO) + r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done)); +#elif defined(HAVE_POSIXAIO) + if (piocb[done]->n_aiocb == 1) { + // TODO: consider batching multiple reads together with lio_listio + piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx; + piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done]; + r = aio_read(&piocb[done]->aio.aiocb); + } else { + struct sigevent sev; + sev.sigev_notify = SIGEV_KEVENT; + sev.sigev_notify_kqueue = ctx; + sev.sigev_value.sival_ptr = piocb[done]; + r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev); + } +#endif + if (r < 0) { + if (r == -EAGAIN && attempts-- > 0) { + usleep(delay); + delay *= 2; + (*retries)++; + continue; + } + return r; + } + ceph_assert(r > 0); + done += r; + left -= r; + attempts = 16; + delay = 125; + } + return done; +} + +int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max) +{ +#if defined(HAVE_LIBAIO) + io_event events[max]; +#elif defined(HAVE_POSIXAIO) + struct kevent events[max]; +#endif + struct timespec t = { + timeout_ms / 1000, + (timeout_ms % 1000) * 1000 * 1000 + }; + + int r = 0; + do { +#if defined(HAVE_LIBAIO) + r = io_getevents(ctx, 1, max, events, &t); +#elif defined(HAVE_POSIXAIO) + r = kevent(ctx, NULL, 0, events, max, &t); + if (r < 0) + r = -errno; +#endif + } while (r == -EINTR); + + for (int i=0; irval = events[i].res; +#else + paio[i] = (aio_t*)events[i].udata; + if (paio[i]->n_aiocb == 1) { + paio[i]->rval = aio_return(&paio[i]->aio.aiocb); + } else { + // Emulate the return value of pwritev. I can't find any documentation + // for what the value of io_event.res is supposed to be. I'm going to + // assume that it's just like pwritev/preadv/pwrite/pread. + paio[i]->rval = 0; + for (int j = 0; j < paio[i]->n_aiocb; j++) { + int res = aio_return(&paio[i]->aio.aiocbp[j]); + if (res < 0) { + paio[i]->rval = res; + break; + } else { + paio[i]->rval += res; + } + } + free(paio[i]->aio.aiocbp); + } +#endif + } + return r; +} diff --git a/src/blk/aio/aio.h b/src/blk/aio/aio.h new file mode 100644 index 000000000..14b89784b --- /dev/null +++ b/src/blk/aio/aio.h @@ -0,0 +1,159 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "acconfig.h" + +#if defined(HAVE_LIBAIO) +#include +#elif defined(HAVE_POSIXAIO) +#include +#include +#endif + +#include +#include + +#include "include/buffer.h" +#include "include/types.h" + +struct aio_t { +#if defined(HAVE_LIBAIO) + struct iocb iocb{}; // must be first element; see shenanigans in aio_queue_t +#elif defined(HAVE_POSIXAIO) + // static long aio_listio_max = -1; + union { + struct aiocb aiocb; + struct aiocb *aiocbp; + } aio; + int n_aiocb; +#endif + void *priv; + int fd; + boost::container::small_vector iov; + uint64_t offset, length; + long rval; + ceph::buffer::list bl; ///< write payload (so that it remains stable for duration) + + boost::intrusive::list_member_hook<> queue_item; + + aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) { + } + + void pwritev(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; +#if defined(HAVE_LIBAIO) + io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset); +#elif defined(HAVE_POSIXAIO) + n_aiocb = iov.size(); + aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb)); + for (int i = 0; i < iov.size(); i++) { + aio.aiocbp[i].aio_fildes = fd; + aio.aiocbp[i].aio_offset = offset; + aio.aiocbp[i].aio_buf = iov[i].iov_base; + aio.aiocbp[i].aio_nbytes = iov[i].iov_len; + aio.aiocbp[i].aio_lio_opcode = LIO_WRITE; + offset += iov[i].iov_len; + } +#endif + } + + void preadv(uint64_t _offset, uint64_t len) { + offset = _offset; + length = len; +#if defined(HAVE_LIBAIO) + io_prep_preadv(&iocb, fd, &iov[0], iov.size(), offset); +#elif defined(HAVE_POSIXAIO) + n_aiocb = iov.size(); + aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb)); + for (size_t i = 0; i < iov.size(); i++) { + aio.aiocbp[i].aio_fildes = fd; + aio.aiocbp[i].aio_buf = iov[i].iov_base; + aio.aiocbp[i].aio_nbytes = iov[i].iov_len; + aio.aiocbp[i].aio_offset = offset; + aio.aiocbp[i].aio_lio_opcode = LIO_READ; + offset += iov[i].iov_len; + } +#endif + } + + long get_return_value() { + return rval; + } +}; + +std::ostream& operator<<(std::ostream& os, const aio_t& aio); + +typedef boost::intrusive::list< + aio_t, + boost::intrusive::member_hook< + aio_t, + boost::intrusive::list_member_hook<>, + &aio_t::queue_item> > aio_list_t; + +struct io_queue_t { + typedef std::list::iterator aio_iter; + + virtual ~io_queue_t() {}; + + virtual int init(std::vector &fds) = 0; + virtual void shutdown() = 0; + virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) = 0; + virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0; +}; + +struct aio_queue_t final : public io_queue_t { + int max_iodepth; +#if defined(HAVE_LIBAIO) + io_context_t ctx; +#elif defined(HAVE_POSIXAIO) + int ctx; +#endif + + explicit aio_queue_t(unsigned max_iodepth) + : max_iodepth(max_iodepth), + ctx(0) { + } + ~aio_queue_t() final { + ceph_assert(ctx == 0); + } + + int init(std::vector &fds) final { + (void)fds; + ceph_assert(ctx == 0); +#if defined(HAVE_LIBAIO) + int r = io_setup(max_iodepth, &ctx); + if (r < 0) { + if (ctx) { + io_destroy(ctx); + ctx = 0; + } + } + return r; +#elif defined(HAVE_POSIXAIO) + ctx = kqueue(); + if (ctx < 0) + return -errno; + else + return 0; +#endif + } + void shutdown() final { + if (ctx) { +#if defined(HAVE_LIBAIO) + int r = io_destroy(ctx); +#elif defined(HAVE_POSIXAIO) + int r = close(ctx); +#endif + ceph_assert(r == 0); + ctx = 0; + } + } + + int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size, + void *priv, int *retries) final; + int get_next_completed(int timeout_ms, aio_t **paio, int max) final; +}; -- cgit v1.2.3