summaryrefslogtreecommitdiffstats
path: root/tpool/aio_linux.cc
diff options
context:
space:
mode:
Diffstat (limited to 'tpool/aio_linux.cc')
-rw-r--r--tpool/aio_linux.cc189
1 files changed, 189 insertions, 0 deletions
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
new file mode 100644
index 00000000..507c6b92
--- /dev/null
+++ b/tpool/aio_linux.cc
@@ -0,0 +1,189 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include "tpool.h"
+
+# include <thread>
+# include <atomic>
+# include <cstdio>
+# include <libaio.h>
+# include <sys/syscall.h>
+
+/**
+ Invoke the io_getevents() system call, without timeout parameter.
+
+ @param ctx context from io_setup()
+ @param min_nr minimum number of completion events to wait for
+ @param nr maximum number of completion events to collect
+ @param ev the collected events
+
+ In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
+ the io_getevents() implementation in libaio was "optimized" so that it
+ would elide the system call when there are no outstanding requests
+ and a timeout was specified.
+
+ The libaio code for dereferencing ctx would occasionally trigger
+ SIGSEGV if io_destroy() was concurrently invoked from another thread.
+ Hence, we have to use the raw system call.
+
+ WHY are we doing this at all?
+ Because we want io_destroy() from another thread to interrupt io_getevents().
+
+ And, WHY do we want io_destroy() from another thread to interrupt
+ io_getevents()?
+
+ Because there is no documented, libaio-friendly and race-condition-free way to
+ interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
+ for us so far.
+
+ Historical note : in the past, we used io_getevents with timeouts. We'd wake
+ up periodically, check for shutdown flag, return from the main routine.
+ This was admittedly safer, yet it did cost periodic wakeups, which we are not
+ willing to do anymore.
+
+ @note we also rely on the undocumented property, that io_destroy(ctx)
+ will make this version of io_getevents return EINVAL.
+*/
+static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
+{
+ int saved_errno= errno;
+ int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
+ min_nr, nr, ev, 0);
+ if (ret < 0)
+ {
+ ret= -errno;
+ errno= saved_errno;
+ }
+ return ret;
+}
+
+
+/*
+ Linux AIO implementation, based on native AIO.
+ Needs libaio.h and -laio at the compile time.
+
+ io_submit() is used to submit async IO.
+
+ A single thread will collect the completion notification
+ with io_getevents() and forward io completion callback to
+ the worker threadpool.
+*/
+namespace tpool
+{
+
+class aio_linux final : public aio
+{
+ thread_pool *m_pool;
+ io_context_t m_io_ctx;
+ std::thread m_getevent_thread;
+ static std::atomic<bool> shutdown_in_progress;
+
+ static void getevent_thread_routine(aio_linux *aio)
+ {
+ /*
+ We collect events in small batches to hopefully reduce the
+ number of system calls.
+ */
+ constexpr unsigned MAX_EVENTS= 256;
+
+ io_event events[MAX_EVENTS];
+ for (;;)
+ {
+ switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
+ case -EINTR:
+ continue;
+ case -EINVAL:
+ if (shutdown_in_progress)
+ return;
+ /* fall through */
+ default:
+ if (ret < 0)
+ {
+ fprintf(stderr, "io_getevents returned %d\n", ret);
+ abort();
+ return;
+ }
+ for (int i= 0; i < ret; i++)
+ {
+ const io_event &event= events[i];
+ aiocb *iocb= static_cast<aiocb*>(event.obj);
+ if (static_cast<int>(event.res) < 0)
+ {
+ iocb->m_err= -event.res;
+ iocb->m_ret_len= 0;
+ }
+ else
+ {
+ iocb->m_ret_len= event.res;
+ iocb->m_err= 0;
+ finish_synchronous(iocb);
+ }
+ iocb->m_internal_task.m_func= iocb->m_callback;
+ iocb->m_internal_task.m_arg= iocb;
+ iocb->m_internal_task.m_group= iocb->m_group;
+ aio->m_pool->submit_task(&iocb->m_internal_task);
+ }
+ }
+ }
+ }
+
+public:
+ aio_linux(io_context_t ctx, thread_pool *pool)
+ : m_pool(pool), m_io_ctx(ctx),
+ m_getevent_thread(getevent_thread_routine, this)
+ {
+ }
+
+ ~aio_linux()
+ {
+ shutdown_in_progress= true;
+ io_destroy(m_io_ctx);
+ m_getevent_thread.join();
+ shutdown_in_progress= false;
+ }
+
+ int submit_io(aiocb *cb) override
+ {
+ io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+ if (cb->m_opcode != aio_opcode::AIO_PREAD)
+ cb->aio_lio_opcode= IO_CMD_PWRITE;
+ iocb *icb= static_cast<iocb*>(cb);
+ int ret= io_submit(m_io_ctx, 1, &icb);
+ if (ret == 1)
+ return 0;
+ errno= -ret;
+ return -1;
+ }
+
+ int bind(native_file_handle&) override { return 0; }
+ int unbind(const native_file_handle&) override { return 0; }
+};
+
+std::atomic<bool> aio_linux::shutdown_in_progress;
+
+aio *create_linux_aio(thread_pool *pool, int max_io)
+{
+ io_context_t ctx;
+ memset(&ctx, 0, sizeof ctx);
+ if (int ret= io_setup(max_io, &ctx))
+ {
+ fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
+ return nullptr;
+ }
+ return new aio_linux(ctx, pool);
+}
+}