summaryrefslogtreecommitdiffstats
path: root/src/blk/aio
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/blk/aio
parentInitial commit. (diff)
downloadceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.tar.xz
ceph-19fcec84d8d7d21e796c7624e521b60d28ee21ed.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/blk/aio')
-rw-r--r--src/blk/aio/aio.cc124
-rw-r--r--src/blk/aio/aio.h159
2 files changed, 283 insertions, 0 deletions
diff --git a/src/blk/aio/aio.cc b/src/blk/aio/aio.cc
new file mode 100644
index 000000000..00a12bfd1
--- /dev/null
+++ b/src/blk/aio/aio.cc
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <algorithm>
+#include "aio.h"
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio)
+{
+ unsigned i = 0;
+ os << "aio: ";
+ for (auto& iov : aio.iov) {
+ os << "\n [" << i++ << "] 0x"
+ << std::hex << iov.iov_base << "~" << iov.iov_len << std::dec;
+ }
+ return os;
+}
+
+int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
+ uint16_t aios_size, void *priv,
+ int *retries)
+{
+ // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
+ int attempts = 16;
+ int delay = 125;
+ int r;
+
+ aio_iter cur = begin;
+ struct aio_t *piocb[aios_size];
+ int left = 0;
+ while (cur != end) {
+ cur->priv = priv;
+ *(piocb+left) = &(*cur);
+ ++left;
+ ++cur;
+ }
+ ceph_assert(aios_size >= left);
+ int done = 0;
+ while (left > 0) {
+#if defined(HAVE_LIBAIO)
+ r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done));
+#elif defined(HAVE_POSIXAIO)
+ if (piocb[done]->n_aiocb == 1) {
+ // TODO: consider batching multiple reads together with lio_listio
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
+ piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done];
+ r = aio_read(&piocb[done]->aio.aiocb);
+ } else {
+ struct sigevent sev;
+ sev.sigev_notify = SIGEV_KEVENT;
+ sev.sigev_notify_kqueue = ctx;
+ sev.sigev_value.sival_ptr = piocb[done];
+ r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev);
+ }
+#endif
+ if (r < 0) {
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(delay);
+ delay *= 2;
+ (*retries)++;
+ continue;
+ }
+ return r;
+ }
+ ceph_assert(r > 0);
+ done += r;
+ left -= r;
+ attempts = 16;
+ delay = 125;
+ }
+ return done;
+}
+
+int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+#if defined(HAVE_LIBAIO)
+ io_event events[max];
+#elif defined(HAVE_POSIXAIO)
+ struct kevent events[max];
+#endif
+ struct timespec t = {
+ timeout_ms / 1000,
+ (timeout_ms % 1000) * 1000 * 1000
+ };
+
+ int r = 0;
+ do {
+#if defined(HAVE_LIBAIO)
+ r = io_getevents(ctx, 1, max, events, &t);
+#elif defined(HAVE_POSIXAIO)
+ r = kevent(ctx, NULL, 0, events, max, &t);
+ if (r < 0)
+ r = -errno;
+#endif
+ } while (r == -EINTR);
+
+ for (int i=0; i<r; ++i) {
+#if defined(HAVE_LIBAIO)
+ paio[i] = (aio_t *)events[i].obj;
+ paio[i]->rval = events[i].res;
+#else
+ paio[i] = (aio_t*)events[i].udata;
+ if (paio[i]->n_aiocb == 1) {
+ paio[i]->rval = aio_return(&paio[i]->aio.aiocb);
+ } else {
+ // Emulate the return value of pwritev. I can't find any documentation
+ // for what the value of io_event.res is supposed to be. I'm going to
+ // assume that it's just like pwritev/preadv/pwrite/pread.
+ paio[i]->rval = 0;
+ for (int j = 0; j < paio[i]->n_aiocb; j++) {
+ int res = aio_return(&paio[i]->aio.aiocbp[j]);
+ if (res < 0) {
+ paio[i]->rval = res;
+ break;
+ } else {
+ paio[i]->rval += res;
+ }
+ }
+ free(paio[i]->aio.aiocbp);
+ }
+#endif
+ }
+ return r;
+}
diff --git a/src/blk/aio/aio.h b/src/blk/aio/aio.h
new file mode 100644
index 000000000..14b89784b
--- /dev/null
+++ b/src/blk/aio/aio.h
@@ -0,0 +1,159 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+
+#if defined(HAVE_LIBAIO)
+#include <libaio.h>
+#elif defined(HAVE_POSIXAIO)
+#include <aio.h>
+#include <sys/event.h>
+#endif
+
+#include <boost/intrusive/list.hpp>
+#include <boost/container/small_vector.hpp>
+
+#include "include/buffer.h"
+#include "include/types.h"
+
+struct aio_t {
+#if defined(HAVE_LIBAIO)
+ struct iocb iocb{}; // must be first element; see shenanigans in aio_queue_t
+#elif defined(HAVE_POSIXAIO)
+ // static long aio_listio_max = -1;
+ union {
+ struct aiocb aiocb;
+ struct aiocb *aiocbp;
+ } aio;
+ int n_aiocb;
+#endif
+ void *priv;
+ int fd;
+ boost::container::small_vector<iovec,4> iov;
+ uint64_t offset, length;
+ long rval;
+ ceph::buffer::list bl; ///< write payload (so that it remains stable for duration)
+
+ boost::intrusive::list_member_hook<> queue_item;
+
+ aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) {
+ }
+
+ void pwritev(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (int i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_lio_opcode = LIO_WRITE;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ void preadv(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+#if defined(HAVE_LIBAIO)
+ io_prep_preadv(&iocb, fd, &iov[0], iov.size(), offset);
+#elif defined(HAVE_POSIXAIO)
+ n_aiocb = iov.size();
+ aio.aiocbp = (struct aiocb*)calloc(iov.size(), sizeof(struct aiocb));
+ for (size_t i = 0; i < iov.size(); i++) {
+ aio.aiocbp[i].aio_fildes = fd;
+ aio.aiocbp[i].aio_buf = iov[i].iov_base;
+ aio.aiocbp[i].aio_nbytes = iov[i].iov_len;
+ aio.aiocbp[i].aio_offset = offset;
+ aio.aiocbp[i].aio_lio_opcode = LIO_READ;
+ offset += iov[i].iov_len;
+ }
+#endif
+ }
+
+ long get_return_value() {
+ return rval;
+ }
+};
+
+std::ostream& operator<<(std::ostream& os, const aio_t& aio);
+
+typedef boost::intrusive::list<
+ aio_t,
+ boost::intrusive::member_hook<
+ aio_t,
+ boost::intrusive::list_member_hook<>,
+ &aio_t::queue_item> > aio_list_t;
+
+struct io_queue_t {
+ typedef std::list<aio_t>::iterator aio_iter;
+
+ virtual ~io_queue_t() {};
+
+ virtual int init(std::vector<int> &fds) = 0;
+ virtual void shutdown() = 0;
+ virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) = 0;
+ virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
+};
+
+struct aio_queue_t final : public io_queue_t {
+ int max_iodepth;
+#if defined(HAVE_LIBAIO)
+ io_context_t ctx;
+#elif defined(HAVE_POSIXAIO)
+ int ctx;
+#endif
+
+ explicit aio_queue_t(unsigned max_iodepth)
+ : max_iodepth(max_iodepth),
+ ctx(0) {
+ }
+ ~aio_queue_t() final {
+ ceph_assert(ctx == 0);
+ }
+
+ int init(std::vector<int> &fds) final {
+ (void)fds;
+ ceph_assert(ctx == 0);
+#if defined(HAVE_LIBAIO)
+ int r = io_setup(max_iodepth, &ctx);
+ if (r < 0) {
+ if (ctx) {
+ io_destroy(ctx);
+ ctx = 0;
+ }
+ }
+ return r;
+#elif defined(HAVE_POSIXAIO)
+ ctx = kqueue();
+ if (ctx < 0)
+ return -errno;
+ else
+ return 0;
+#endif
+ }
+ void shutdown() final {
+ if (ctx) {
+#if defined(HAVE_LIBAIO)
+ int r = io_destroy(ctx);
+#elif defined(HAVE_POSIXAIO)
+ int r = close(ctx);
+#endif
+ ceph_assert(r == 0);
+ ctx = 0;
+ }
+ }
+
+ int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
+ void *priv, int *retries) final;
+ int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
+};