From 06eaf7232e9a920468c0f8d74dcf2fe8b555501c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 14:24:36 +0200 Subject: Adding upstream version 1:10.11.6. Signed-off-by: Daniel Baumann --- tpool/CMakeLists.txt | 56 +++ tpool/aio_liburing.cc | 209 ++++++++++ tpool/aio_linux.cc | 189 +++++++++ tpool/aio_simulated.cc | 164 ++++++++ tpool/aio_win.cc | 139 +++++++ tpool/task.cc | 92 +++++ tpool/task_group.cc | 115 ++++++ tpool/tpool.h | 302 ++++++++++++++ tpool/tpool_generic.cc | 965 +++++++++++++++++++++++++++++++++++++++++++++ tpool/tpool_structs.h | 440 +++++++++++++++++++++ tpool/tpool_win.cc | 291 ++++++++++++++ tpool/wait_notification.cc | 25 ++ 12 files changed, 2987 insertions(+) create mode 100644 tpool/CMakeLists.txt create mode 100644 tpool/aio_liburing.cc create mode 100644 tpool/aio_linux.cc create mode 100644 tpool/aio_simulated.cc create mode 100644 tpool/aio_win.cc create mode 100644 tpool/task.cc create mode 100644 tpool/task_group.cc create mode 100644 tpool/tpool.h create mode 100644 tpool/tpool_generic.cc create mode 100644 tpool/tpool_structs.h create mode 100644 tpool/tpool_win.cc create mode 100644 tpool/wait_notification.cc (limited to 'tpool') diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt new file mode 100644 index 00000000..115e3d58 --- /dev/null +++ b/tpool/CMakeLists.txt @@ -0,0 +1,56 @@ +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/include) +IF(WIN32) + SET(EXTRA_SOURCES tpool_win.cc aio_win.cc) +ELSEIF(CMAKE_SYSTEM_NAME STREQUAL "Linux") + OPTION(WITH_URING "Require that io_uring be used" OFF) + OPTION(WITH_LIBAIO "Require that libaio is used, unless uring is there" OFF) + IF(WITH_URING) + SET(URING_REQUIRED REQUIRED) + ELSEIF(WITH_LIBAIO) + SET(LIBAIO_REQIRED REQUIRED) + ENDIF() + FIND_PACKAGE(URING QUIET ${URING_REQUIRED}) + IF(URING_FOUND) + SET(URING_FOUND ${URING_FOUND} PARENT_SCOPE) + SET(TPOOL_DEFINES "-DHAVE_URING" PARENT_SCOPE) + ADD_DEFINITIONS(-DHAVE_URING) + LINK_LIBRARIES(${URING_LIBRARIES}) + INCLUDE_DIRECTORIES(${URING_INCLUDE_DIRS}) + SET(EXTRA_SOURCES aio_liburing.cc) + SET(CMAKE_REQUIRED_INCLUDES_SAVE ${CMAKE_REQUIRED_INCLUDES}) + SET(CMAKE_REQUIRED_LIBRARIES_SAVE ${CMAKE_REQUIRED_LIBRARIES}) + SET(CMAKE_REQUIRED_INCLUDES ${URING_INCLUDE_DIRS}) + SET(CMAKE_REQUIRED_LIBRARIES ${URING_LIBRARIES}) + CHECK_SYMBOL_EXISTS(io_uring_mlock_size "liburing.h" HAVE_IO_URING_MLOCK_SIZE) + SET(CMAKE_REQUIRED_INCLUDES ${CMAKE_REQUIRED_INCLUDES_SAVE}) + SET(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES_SAVE}) + IF(HAVE_IO_URING_MLOCK_SIZE) + SET_SOURCE_FILES_PROPERTIES(aio_liburing.cc PROPERTIES COMPILE_FLAGS "-DHAVE_IO_URING_MLOCK_SIZE") + ENDIF() + ELSE() + FIND_PACKAGE(LIBAIO QUIET ${LIBAIO_REQUIRED}) + IF(LIBAIO_FOUND) + SET(TPOOL_DEFINES "-DLINUX_NATIVE_AIO" PARENT_SCOPE) + ADD_DEFINITIONS(-DLINUX_NATIVE_AIO) + INCLUDE_DIRECTORIES(${LIBAIO_INCLUDE_DIRS}) + LINK_LIBRARIES(${LIBAIO_LIBRARIES}) + SET(EXTRA_SOURCES aio_linux.cc) + ENDIF() + ENDIF() +ENDIF() + +ADD_LIBRARY(tpool STATIC + aio_simulated.cc + tpool_structs.h + CMakeLists.txt + tpool.h + tpool_generic.cc + task_group.cc + task.cc + wait_notification.cc + ${EXTRA_SOURCES} +) + +IF(URING_FOUND) + ADD_DEPENDENCIES(tpool GenError) +ENDIF() diff --git a/tpool/aio_liburing.cc b/tpool/aio_liburing.cc new file mode 100644 index 00000000..447c2335 --- /dev/null +++ b/tpool/aio_liburing.cc @@ -0,0 +1,209 @@ +/* Copyright (C) 2021, 2022, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include "tpool.h" +#include "mysql/service_my_print_error.h" +#include "mysqld_error.h" + +#include + +#include +#include +#include +#include +#include + +namespace +{ + +class aio_uring final : public tpool::aio +{ +public: + aio_uring(tpool::thread_pool *tpool, int max_aio) : tpool_(tpool) + { + if (io_uring_queue_init(max_aio, &uring_, 0) != 0) + { + switch (const auto e= errno) { + case ENOMEM: + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_queue_init() failed with ENOMEM:" + " try larger memory locked limit, ulimit -l" + ", or https://mariadb.com/kb/en/systemd/#configuring-limitmemlock" + " under systemd" +#ifdef HAVE_IO_URING_MLOCK_SIZE + " (%zd bytes required)", ME_ERROR_LOG | ME_WARNING, + io_uring_mlock_size(max_aio, 0)); +#else + , ME_ERROR_LOG | ME_WARNING); +#endif + break; + case ENOSYS: + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_queue_init() failed with ENOSYS:" + " check seccomp filters, and the kernel version " + "(newer than 5.1 required)", + ME_ERROR_LOG | ME_WARNING); + break; + default: + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_queue_init() failed with errno %d", + ME_ERROR_LOG | ME_WARNING, e); + } + throw std::runtime_error("aio_uring()"); + } + if (io_uring_ring_dontfork(&uring_) != 0) + { + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_dontfork() failed with errno %d (continuing)", + ME_ERROR_LOG | ME_WARNING, errno); + } + + thread_= std::thread(thread_routine, this); + } + + ~aio_uring() noexcept + { + { + std::lock_guard _(mutex_); + io_uring_sqe *sqe= io_uring_get_sqe(&uring_); + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, nullptr); + auto ret= io_uring_submit(&uring_); + if (ret != 1) + { + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_submit() returned %d during shutdown:" + " this may cause a hang\n", + ME_ERROR_LOG | ME_FATAL, ret); + abort(); + } + } + thread_.join(); + io_uring_queue_exit(&uring_); + } + + int submit_io(tpool::aiocb *cb) final + { + cb->iov_base= cb->m_buffer; + cb->iov_len= cb->m_len; + + // The whole operation since io_uring_get_sqe() and till io_uring_submit() + // must be atomical. This is because liburing provides thread-unsafe calls. + std::lock_guard _(mutex_); + + io_uring_sqe *sqe= io_uring_get_sqe(&uring_); + if (cb->m_opcode == tpool::aio_opcode::AIO_PREAD) + io_uring_prep_readv(sqe, cb->m_fh, static_cast(cb), 1, + cb->m_offset); + else + io_uring_prep_writev(sqe, cb->m_fh, static_cast(cb), 1, + cb->m_offset); + io_uring_sqe_set_data(sqe, cb); + + return io_uring_submit(&uring_) == 1 ? 0 : -1; + } + + int bind(native_file_handle &fd) final + { + std::lock_guard _(files_mutex_); + auto it= std::lower_bound(files_.begin(), files_.end(), fd); + assert(it == files_.end() || *it != fd); + files_.insert(it, fd); + return io_uring_register_files_update(&uring_, 0, files_.data(), + files_.size()); + } + + int unbind(const native_file_handle &fd) final + { + std::lock_guard _(files_mutex_); + auto it= std::lower_bound(files_.begin(), files_.end(), fd); + assert(*it == fd); + files_.erase(it); + return io_uring_register_files_update(&uring_, 0, files_.data(), + files_.size()); + } + +private: + static void thread_routine(aio_uring *aio) + { + for (;;) + { + io_uring_cqe *cqe; + if (int ret= io_uring_wait_cqe(&aio->uring_, &cqe)) + { + if (ret == -EINTR) + continue; + my_printf_error(ER_UNKNOWN_ERROR, + "io_uring_wait_cqe() returned %d\n", + ME_ERROR_LOG | ME_FATAL, ret); + abort(); + } + + auto *iocb= static_cast(io_uring_cqe_get_data(cqe)); + if (!iocb) + break; // ~aio_uring() told us to terminate + + int res= cqe->res; + if (res < 0) + { + iocb->m_err= -res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_err= 0; + iocb->m_ret_len= res; + } + + io_uring_cqe_seen(&aio->uring_, cqe); + finish_synchronous(iocb); + + // If we need to resubmit the IO operation, but the ring is full, + // we will follow the same path as for any other error codes. + if (res == -EAGAIN && !aio->submit_io(iocb)) + continue; + + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->tpool_->submit_task(&iocb->m_internal_task); + } + } + + io_uring uring_; + std::mutex mutex_; + tpool::thread_pool *tpool_; + std::thread thread_; + + std::vector files_; + std::mutex files_mutex_; +}; + +} // namespace + +namespace tpool +{ + +aio *create_linux_aio(thread_pool *pool, int max_aio) +{ + try { + return new aio_uring(pool, max_aio); + } catch (std::runtime_error& error) { + return nullptr; + } +} + +} // namespace tpool diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc new file mode 100644 index 00000000..507c6b92 --- /dev/null +++ b/tpool/aio_linux.cc @@ -0,0 +1,189 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include "tpool.h" + +# include +# include +# include +# include +# include + +/** + Invoke the io_getevents() system call, without timeout parameter. + + @param ctx context from io_setup() + @param min_nr minimum number of completion events to wait for + @param nr maximum number of completion events to collect + @param ev the collected events + + In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc + the io_getevents() implementation in libaio was "optimized" so that it + would elide the system call when there are no outstanding requests + and a timeout was specified. + + The libaio code for dereferencing ctx would occasionally trigger + SIGSEGV if io_destroy() was concurrently invoked from another thread. + Hence, we have to use the raw system call. + + WHY are we doing this at all? + Because we want io_destroy() from another thread to interrupt io_getevents(). + + And, WHY do we want io_destroy() from another thread to interrupt + io_getevents()? + + Because there is no documented, libaio-friendly and race-condition-free way to + interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work + for us so far. + + Historical note : in the past, we used io_getevents with timeouts. We'd wake + up periodically, check for shutdown flag, return from the main routine. + This was admittedly safer, yet it did cost periodic wakeups, which we are not + willing to do anymore. + + @note we also rely on the undocumented property, that io_destroy(ctx) + will make this version of io_getevents return EINVAL. +*/ +static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) +{ + int saved_errno= errno; + int ret= syscall(__NR_io_getevents, reinterpret_cast(ctx), + min_nr, nr, ev, 0); + if (ret < 0) + { + ret= -errno; + errno= saved_errno; + } + return ret; +} + + +/* + Linux AIO implementation, based on native AIO. + Needs libaio.h and -laio at the compile time. + + io_submit() is used to submit async IO. + + A single thread will collect the completion notification + with io_getevents() and forward io completion callback to + the worker threadpool. +*/ +namespace tpool +{ + +class aio_linux final : public aio +{ + thread_pool *m_pool; + io_context_t m_io_ctx; + std::thread m_getevent_thread; + static std::atomic shutdown_in_progress; + + static void getevent_thread_routine(aio_linux *aio) + { + /* + We collect events in small batches to hopefully reduce the + number of system calls. + */ + constexpr unsigned MAX_EVENTS= 256; + + io_event events[MAX_EVENTS]; + for (;;) + { + switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) { + case -EINTR: + continue; + case -EINVAL: + if (shutdown_in_progress) + return; + /* fall through */ + default: + if (ret < 0) + { + fprintf(stderr, "io_getevents returned %d\n", ret); + abort(); + return; + } + for (int i= 0; i < ret; i++) + { + const io_event &event= events[i]; + aiocb *iocb= static_cast(event.obj); + if (static_cast(event.res) < 0) + { + iocb->m_err= -event.res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_ret_len= event.res; + iocb->m_err= 0; + finish_synchronous(iocb); + } + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->m_pool->submit_task(&iocb->m_internal_task); + } + } + } + } + +public: + aio_linux(io_context_t ctx, thread_pool *pool) + : m_pool(pool), m_io_ctx(ctx), + m_getevent_thread(getevent_thread_routine, this) + { + } + + ~aio_linux() + { + shutdown_in_progress= true; + io_destroy(m_io_ctx); + m_getevent_thread.join(); + shutdown_in_progress= false; + } + + int submit_io(aiocb *cb) override + { + io_prep_pread(static_cast(cb), cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + if (cb->m_opcode != aio_opcode::AIO_PREAD) + cb->aio_lio_opcode= IO_CMD_PWRITE; + iocb *icb= static_cast(cb); + int ret= io_submit(m_io_ctx, 1, &icb); + if (ret == 1) + return 0; + errno= -ret; + return -1; + } + + int bind(native_file_handle&) override { return 0; } + int unbind(const native_file_handle&) override { return 0; } +}; + +std::atomic aio_linux::shutdown_in_progress; + +aio *create_linux_aio(thread_pool *pool, int max_io) +{ + io_context_t ctx; + memset(&ctx, 0, sizeof ctx); + if (int ret= io_setup(max_io, &ctx)) + { + fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); + return nullptr; + } + return new aio_linux(ctx, pool); +} +} diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc new file mode 100644 index 00000000..4bc58c29 --- /dev/null +++ b/tpool/aio_simulated.cc @@ -0,0 +1,164 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#ifndef _WIN32 +#include /* pread(), pwrite() */ +#endif +#include "tpool.h" +#include "tpool_structs.h" +#include +#include + +namespace tpool +{ +#ifdef _WIN32 + +/* + In order to be able to execute synchronous IO even on file opened + with FILE_FLAG_OVERLAPPED, and to bypass to completion port, + we use valid event handle for the hEvent member of the OVERLAPPED structure, + with its low-order bit set. + + See MSDN docs for GetQueuedCompletionStatus() for description of this trick. +*/ +static DWORD fls_sync_io= FLS_OUT_OF_INDEXES; +HANDLE win_get_syncio_event() +{ + HANDLE h; + + h= (HANDLE) FlsGetValue(fls_sync_io); + if (h) + { + return h; + } + h= CreateEventA(NULL, FALSE, FALSE, NULL); + /* Set low-order bit to keeps I/O completion from being queued */ + h= (HANDLE)((uintptr_t) h | 1); + FlsSetValue(fls_sync_io, h); + return h; +} +#include +static void __stdcall win_free_syncio_event(void *data) +{ + if (data) + { + CloseHandle((HANDLE) data); + } +} + +struct WinIoInit +{ + WinIoInit() + { + fls_sync_io= FlsAlloc(win_free_syncio_event); + if(fls_sync_io == FLS_OUT_OF_INDEXES) + abort(); + } + ~WinIoInit() { FlsFree(fls_sync_io); } +}; + +static WinIoInit win_io_init; + + +SSIZE_T pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + if (count > 0xFFFFFFFF) + count= 0xFFFFFFFF; + + if (ReadFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + + return -1; +} + +SSIZE_T pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + if (count > 0xFFFFFFFF) + count= 0xFFFFFFFF; + if (WriteFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + return -1; +} +#endif + +/** + Simulated AIO. + + Executes IO synchronously in worker pool + and then calls the completion routine. +*/ +class simulated_aio : public aio +{ + thread_pool *m_pool; + +public: + simulated_aio(thread_pool *tp) + : m_pool(tp) + { + } + + static void simulated_aio_callback(void *param) + { + aiocb *cb= (aiocb *) param; + synchronous(cb); + cb->m_internal_task.m_func= cb->m_callback; + thread_pool *pool= (thread_pool *)cb->m_internal; + pool->submit_task(&cb->m_internal_task); + } + + virtual int submit_io(aiocb *aiocb) override + { + aiocb->m_internal_task.m_func = simulated_aio_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + aiocb->m_internal = m_pool; + m_pool->submit_task(&aiocb->m_internal_task); + return 0; + } + + virtual int bind(native_file_handle &fd) override { return 0; } + virtual int unbind(const native_file_handle &fd) override { return 0; } +}; + +aio *create_simulated_aio(thread_pool *tp) +{ + return new simulated_aio(tp); +} + +} // namespace tpool diff --git a/tpool/aio_win.cc b/tpool/aio_win.cc new file mode 100644 index 00000000..b44f705b --- /dev/null +++ b/tpool/aio_win.cc @@ -0,0 +1,139 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tpool +{ + +/* + Windows AIO implementation, completion port based. + A single thread collects the completion notification with + GetQueuedCompletionStatus(), and forwards io completion callback + the worker threadpool +*/ +class tpool_generic_win_aio : public aio +{ + /* Thread that does collects completion status from the completion port. */ + std::thread m_thread; + + /* IOCP Completion port.*/ + HANDLE m_completion_port; + + /* The worker pool where completion routine is executed, as task. */ + thread_pool* m_pool; +public: + tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool) + { + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); + m_thread = std::thread(aio_completion_thread_proc, this); + } + + /** + Task to be executed in the work pool. + */ + static void io_completion_task(void* data) + { + auto cb = (aiocb*)data; + cb->execute_callback(); + } + + void completion_thread_work() + { + for (;;) + { + DWORD n_bytes; + aiocb* aiocb; + ULONG_PTR key; + if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key, + (LPOVERLAPPED*)& aiocb, INFINITE)) + break; + + aiocb->m_err = 0; + aiocb->m_ret_len = n_bytes; + + if (n_bytes != aiocb->m_len) + { + if (GetOverlappedResult(aiocb->m_fh, aiocb, + (LPDWORD)& aiocb->m_ret_len, FALSE)) + { + aiocb->m_err = GetLastError(); + } + } + aiocb->m_internal_task.m_func = aiocb->m_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + m_pool->submit_task(&aiocb->m_internal_task); + } + } + + static void aio_completion_thread_proc(tpool_generic_win_aio* aio) + { + aio->completion_thread_work(); + } + + ~tpool_generic_win_aio() + { + if (m_completion_port) + CloseHandle(m_completion_port); + m_thread.join(); + } + + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + cb->m_internal = this; + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + return -1; + } + + // Inherited via aio + virtual int bind(native_file_handle& fd) override + { + return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0 + : GetLastError(); + } + virtual int unbind(const native_file_handle& fd) override { return 0; } +}; + +aio* create_win_aio(thread_pool* pool, int max_io) +{ + return new tpool_generic_win_aio(pool, max_io); +} + +} // namespace tpool diff --git a/tpool/task.cc b/tpool/task.cc new file mode 100644 index 00000000..81ec8859 --- /dev/null +++ b/tpool/task.cc @@ -0,0 +1,92 @@ +/* Copyright (C) 2019, 2021, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include +#include +#include +#include +#include + +namespace tpool +{ + task::task(callback_func func, void* arg, task_group* group) : + m_func(func), m_arg(arg), m_group(group) {} + + void task::execute() + { + if (m_group) + { + /* Executing in a group (limiting concurrency).*/ + m_group->execute(this); + } + else + { + /* Execute directly. */ + m_func(m_arg); + release(); + } + } + + /* Task that provide wait() operation. */ + waitable_task::waitable_task(callback_func func, void* arg, task_group* group) : + task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){} + + void waitable_task::add_ref() + { + std::unique_lock lk(m_mtx); + m_ref_count++; + } + + void waitable_task::release() + { + std::unique_lock lk(m_mtx); + m_ref_count--; + if (!m_ref_count && m_waiter_count) + m_cv.notify_all(); + } + void waitable_task::wait(std::unique_lock& lk) + { + m_waiter_count++; + while (m_ref_count) + m_cv.wait(lk); + m_waiter_count--; + } + void waitable_task::wait() + { + std::unique_lock lk(m_mtx); + wait(lk); + } + + static void noop(void*) + { + } + void waitable_task::disable() + { + std::unique_lock lk(m_mtx); + if (m_func == noop) + return; + wait(lk); + m_original_func = m_func; + m_func = noop; + } + void waitable_task::enable() + { + std::unique_lock lk(m_mtx); + if(m_func != noop) + return; + wait(lk); + m_func = m_original_func; + } +} diff --git a/tpool/task_group.cc b/tpool/task_group.cc new file mode 100644 index 00000000..eb57a8be --- /dev/null +++ b/tpool/task_group.cc @@ -0,0 +1,115 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include // usleep +#endif +namespace tpool +{ + + /** + Task_group constructor + + @param max_threads - maximum number of threads allowed to execute + tasks from the group at the same time. + + @param enable_task_release - if true (default), task::release() will be + called after task execution.'false' should only be used in rare cases + when accessing memory, pointed by task structures, would be unsafe after. + the callback. Also 'false' is only possible ,if task::release() is a trivial function + */ + task_group::task_group(unsigned int max_concurrency, + bool enable_task_release) + : + m_queue(8), + m_mtx(), + m_tasks_running(), + m_max_concurrent_tasks(max_concurrency), + m_enable_task_release(enable_task_release) + {}; + + void task_group::set_max_tasks(unsigned int max_concurrency) + { + std::unique_lock lk(m_mtx); + m_max_concurrent_tasks = max_concurrency; + } + void task_group::execute(task* t) + { + std::unique_lock lk(m_mtx); + if (m_tasks_running == m_max_concurrent_tasks) + { + /* Queue for later execution by another thread.*/ + m_queue.push(t); + return; + } + m_tasks_running++; + for (;;) + { + lk.unlock(); + if (t) + { + t->m_func(t->m_arg); + if (m_enable_task_release) + t->release(); + } + lk.lock(); + + if (m_queue.empty()) + break; + t = m_queue.front(); + m_queue.pop(); + } + m_tasks_running--; + } + + void task_group::cancel_pending(task* t) + { + std::unique_lock lk(m_mtx); + if (!t) + m_queue.clear(); + for (auto it = m_queue.begin(); it != m_queue.end(); it++) + { + if (*it == t) + { + (*it)->release(); + (*it) = nullptr; + } + } + } + + task_group::~task_group() + { + std::unique_lock lk(m_mtx); + assert(m_queue.empty()); + + while (m_tasks_running) + { + lk.unlock(); +#ifndef _WIN32 + usleep(1000); +#else + Sleep(1); +#endif + lk.lock(); + } + } +} diff --git a/tpool/tpool.h b/tpool/tpool.h new file mode 100644 index 00000000..2f87bcd4 --- /dev/null +++ b/tpool/tpool.h @@ -0,0 +1,302 @@ +/* Copyright (C) 2019, 2021, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include /* unique_ptr */ +#include +#include +#include +#include +#ifdef LINUX_NATIVE_AIO +#include +#endif +#ifdef HAVE_URING +#include +#endif +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include +/** + Windows-specific native file handle struct. + Apart from the actual handle, contains PTP_IO + used by the Windows threadpool. +*/ +struct native_file_handle +{ + HANDLE m_handle; + PTP_IO m_ptp_io; + native_file_handle(){}; + native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {} + operator HANDLE() const { return m_handle; } +}; +#else +#include +typedef int native_file_handle; +#endif + +namespace tpool +{ +/** + Task callback function + */ +typedef void (*callback_func)(void *); +typedef void (*callback_func_np)(void); +class task; + +/** A class that can be used e.g. for +restricting concurrency for specific class of tasks. */ + +class task_group +{ +private: + circular_queue m_queue; + std::mutex m_mtx; + std::condition_variable m_cv; + unsigned int m_tasks_running; + unsigned int m_max_concurrent_tasks; + const bool m_enable_task_release; + +public: + task_group(unsigned int max_concurrency= 100000, bool m_enable_task_release= true); + void set_max_tasks(unsigned int max_concurrent_tasks); + void execute(task* t); + void cancel_pending(task *t); + ~task_group(); +}; + + +class task +{ +public: + callback_func m_func; + void *m_arg; + task_group* m_group; + virtual void add_ref() {}; + virtual void release() {}; + task() {}; + task(callback_func func, void* arg, task_group* group = nullptr); + void* get_arg() { return m_arg; } + callback_func get_func() { return m_func; } + virtual void execute(); + virtual ~task() {} +}; + +class waitable_task :public task +{ + std::mutex m_mtx; + std::condition_variable m_cv; + int m_ref_count; + int m_waiter_count; + callback_func m_original_func; + void wait(std::unique_lock&lk); +public: + waitable_task(callback_func func, void* arg, task_group* group = nullptr); + void add_ref() override; + void release() override; + TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; } + TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;} + void wait(); + void disable(); + void enable(); + virtual ~waitable_task() {}; +}; +enum class aio_opcode +{ + AIO_PREAD, + AIO_PWRITE +}; +constexpr size_t MAX_AIO_USERDATA_LEN= 4 * sizeof(void*); + +/** IO control block, includes parameters for the IO, and the callback*/ + +struct aiocb +#ifdef _WIN32 + :OVERLAPPED +#elif defined LINUX_NATIVE_AIO + :iocb +#elif defined HAVE_URING + :iovec +#endif +{ + native_file_handle m_fh; + aio_opcode m_opcode; + unsigned long long m_offset; + void *m_buffer; + unsigned int m_len; + callback_func m_callback; + task_group* m_group; + /* Returned length and error code*/ + size_t m_ret_len; + int m_err; + void *m_internal; + task m_internal_task; + alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN]; + + aiocb() : m_internal_task(nullptr, nullptr) + {} + void execute_callback() + { + task t(m_callback, this,m_group); + t.execute(); + } +}; + + +/** + AIO interface +*/ +class aio +{ +public: + /** + Submit asynchronous IO. + On completion, cb->m_callback is executed. + */ + virtual int submit_io(aiocb *cb)= 0; + /** "Bind" file to AIO handler (used on Windows only) */ + virtual int bind(native_file_handle &fd)= 0; + /** "Unind" file to AIO handler (used on Windows only) */ + virtual int unbind(const native_file_handle &fd)= 0; + virtual ~aio(){}; +protected: + static void synchronous(aiocb *cb); + /** finish a partial read/write callback synchronously */ + static inline void finish_synchronous(aiocb *cb) + { + if (!cb->m_err && cb->m_ret_len != cb->m_len) + { + /* partial read/write */ + cb->m_buffer= (char *) cb->m_buffer + cb->m_ret_len; + cb->m_len-= (unsigned int) cb->m_ret_len; + cb->m_offset+= cb->m_ret_len; + synchronous(cb); + } + } +}; + +class timer +{ +public: + virtual void set_time(int initial_delay_ms, int period_ms) = 0; + virtual void disarm() = 0; + virtual ~timer(){} +}; + +class thread_pool; + +extern aio *create_simulated_aio(thread_pool *tp); + +class thread_pool +{ +protected: + /* AIO handler */ + std::unique_ptr m_aio; + virtual aio *create_native_aio(int max_io)= 0; + + /** + Functions to be called at worker thread start/end + can be used for example to set some TLS variables + */ + void (*m_worker_init_callback)(void); + void (*m_worker_destroy_callback)(void); + +public: + thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback() + { + } + virtual void submit_task(task *t)= 0; + virtual timer* create_timer(callback_func func, void *data=nullptr) = 0; + void set_thread_callbacks(void (*init)(), void (*destroy)()) + { + m_worker_init_callback= init; + m_worker_destroy_callback= destroy; + } + int configure_aio(bool use_native_aio, int max_io) + { + if (use_native_aio) + m_aio.reset(create_native_aio(max_io)); + else + m_aio.reset(create_simulated_aio(this)); + return !m_aio ? -1 : 0; + } + + int reconfigure_aio(bool use_native_aio, int max_io) + { + assert(m_aio); + if (use_native_aio) + { + auto new_aio = create_native_aio(max_io); + if (!new_aio) + return -1; + m_aio.reset(new_aio); + } + return 0; + } + + void disable_aio() + { + m_aio.reset(); + } + + /** + Tweaks how fast worker threads are created, or how often they are signaled. + + @param threads - desired number of concurrently active threads + Special value 0 means default. Not the same as max number of threads + in the pool - oversubscription is allowed and stalls are still detected + + @note + It is designed to use with "batch" operations, where huge number + of tasks is submitted in rapid succession. In this case, it is + better to temporarily restrict concurrency, which will make thread + creation throttling more aggressive. + Once the batch is over, restore default concurrency + by calling set_concurrency(0). + */ + virtual void set_concurrency(unsigned int threads=0){} + + int bind(native_file_handle &fd) { return m_aio->bind(fd); } + void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); } + int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } + virtual void wait_begin() {}; + virtual void wait_end() {}; + virtual ~thread_pool() {} +}; +const int DEFAULT_MIN_POOL_THREADS= 1; +const int DEFAULT_MAX_POOL_THREADS= 500; +extern thread_pool * +create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); +extern "C" void tpool_wait_begin(); +extern "C" void tpool_wait_end(); +#ifdef _WIN32 +extern thread_pool * +create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); + +/* + Helper functions, to execute pread/pwrite even if file is + opened with FILE_FLAG_OVERLAPPED, and bound to completion + port. +*/ +SSIZE_T pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +SSIZE_T pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +HANDLE win_get_syncio_event(); +#endif +} // namespace tpool diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc new file mode 100644 index 00000000..fd97b446 --- /dev/null +++ b/tpool/tpool_generic.cc @@ -0,0 +1,965 @@ +/* Copyright (C) 2019, 2022, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "tpool.h" +#include +#include +#include +#include +#include +#include "aligned.h" + +namespace tpool +{ + +#ifdef __linux__ +#if defined(HAVE_URING) || defined(LINUX_NATIVE_AIO) + extern aio* create_linux_aio(thread_pool* tp, int max_io); +#else + aio *create_linux_aio(thread_pool *, int) { return nullptr; }; +#endif +#endif +#ifdef _WIN32 + extern aio* create_win_aio(thread_pool* tp, int max_io); +#endif + + static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500); + static const int OVERSUBSCRIBE_FACTOR = 2; + +/** + Process the cb synchronously +*/ +void aio::synchronous(aiocb *cb) +{ +#ifdef _WIN32 + size_t ret_len; +#else + ssize_t ret_len; +#endif + int err= 0; + switch (cb->m_opcode) + { + case aio_opcode::AIO_PREAD: + ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + case aio_opcode::AIO_PWRITE: + ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + default: + abort(); + } +#ifdef _WIN32 + if (static_cast(ret_len) < 0) + err= GetLastError(); +#else + if (ret_len < 0) + { + err= errno; + ret_len= 0; + } +#endif + cb->m_ret_len = ret_len; + cb->m_err = err; + if (ret_len) + finish_synchronous(cb); +} + + +/** + Implementation of generic threadpool. + This threadpool consists of the following components + + - The task queue. This queue is populated by submit() + - Worker that execute the work items. + - Timer thread that takes care of pool health + + The task queue is populated by submit() method. + on submit(), a worker thread can be woken, or created + to execute tasks. + + The timer thread watches if work items are being dequeued, and if not, + this can indicate potential deadlock. + Thus the timer thread can also wake or create a thread, to ensure some progress. + + Optimizations: + + - worker threads that are idle for long time will shutdown. + - worker threads are woken in LIFO order, which minimizes context switching + and also ensures that idle timeout works well. LIFO wakeup order ensures + that active threads stay active, and idle ones stay idle. + +*/ + +/** + Worker wakeup flags. +*/ +enum worker_wake_reason +{ + WAKE_REASON_NONE, + WAKE_REASON_TASK, + WAKE_REASON_SHUTDOWN +}; + + + +/* A per-worker thread structure.*/ +struct alignas(CPU_LEVEL1_DCACHE_LINESIZE) worker_data +{ + /** Condition variable to wakeup this worker.*/ + std::condition_variable m_cv; + + /** Reason why worker was woken. */ + worker_wake_reason m_wake_reason; + + /** + If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute. + */ + task* m_task; + + /** Struct is member of intrusive doubly linked list */ + worker_data* m_prev; + worker_data* m_next; + + /* Current state of the worker.*/ + enum state + { + NONE = 0, + EXECUTING_TASK = 1, + LONG_TASK = 2, + WAITING = 4 + }; + + int m_state; + + bool is_executing_task() + { + return m_state & EXECUTING_TASK; + } + bool is_long_task() + { + return m_state & LONG_TASK; + } + bool is_waiting() + { + return m_state & WAITING; + } + std::chrono::system_clock::time_point m_task_start_time; + worker_data() : + m_cv(), + m_wake_reason(WAKE_REASON_NONE), + m_task(), + m_prev(), + m_next(), + m_state(NONE), + m_task_start_time() + {} + + /*Define custom new/delete because of overaligned structure. */ + static void *operator new(size_t size) + { + return aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE); + } + static void operator delete(void* p) + { + aligned_free(p); + } +}; + + +static thread_local worker_data* tls_worker_data; + +class thread_pool_generic : public thread_pool +{ + /** Cache for per-worker structures */ + cache m_thread_data_cache; + + /** The task queue */ + circular_queue m_task_queue; + + /** List of standby (idle) workers */ + doubly_linked_list m_standby_threads; + + /** List of threads that are executing tasks */ + doubly_linked_list m_active_threads; + + /* Mutex that protects the whole struct, most importantly + the standby threads list, and task queue */ + std::mutex m_mtx; + + /** Timeout after which idle worker shuts down */ + std::chrono::milliseconds m_thread_timeout; + + /** How often should timer wakeup.*/ + std::chrono::milliseconds m_timer_interval; + + /** Another condition variable, used in pool shutdown */ + std::condition_variable m_cv_no_threads; + + /** Condition variable for the timer thread. Signaled on shutdown. */ + std::condition_variable m_cv_timer; + + /** Overall number of enqueues*/ + unsigned long long m_tasks_enqueued; + unsigned long long m_group_enqueued; + /** Overall number of dequeued tasks. */ + unsigned long long m_tasks_dequeued; + + /** Statistic related, number of worker thread wakeups */ + int m_wakeups; + + /** + Statistic related, number of spurious thread wakeups + (i.e thread woke up, and the task queue is empty) + */ + int m_spurious_wakeups; + + /** The desired concurrency. This number of workers should be + actively executing. */ + unsigned int m_concurrency; + + /** True, if threadpool is being shutdown, false otherwise */ + bool m_in_shutdown; + + /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/ + enum class timer_state_t + { + OFF, ON + }; + timer_state_t m_timer_state= timer_state_t::OFF; + void switch_timer(timer_state_t state); + + /* Updates idle_since, and maybe switches the timer off */ + void check_idle(std::chrono::system_clock::time_point now); + + /** time point when timer last ran, used as a coarse clock. */ + std::chrono::system_clock::time_point m_timestamp; + + /** Number of long running tasks. The long running tasks are excluded when + adjusting concurrency */ + unsigned int m_long_tasks_count; + + unsigned int m_waiting_task_count; + + /** Last time thread was created*/ + std::chrono::system_clock::time_point m_last_thread_creation; + + /** Minimumum number of threads in this pool.*/ + unsigned int m_min_threads; + + /** Maximimum number of threads in this pool. */ + unsigned int m_max_threads; + + /* maintenance related statistics (see maintenance()) */ + size_t m_last_thread_count; + unsigned long long m_last_activity; + std::atomic_flag m_thread_creation_pending= ATOMIC_FLAG_INIT; + + void worker_main(worker_data *thread_data); + void worker_end(worker_data* thread_data); + + /* Checks threadpool responsiveness, adjusts thread_counts */ + void maintenance(); + static void maintenance_func(void* arg) + { + ((thread_pool_generic *)arg)->maintenance(); + } + bool add_thread(); + bool wake(worker_wake_reason reason, task *t = nullptr); + void maybe_wake_or_create_thread(); + bool too_many_active_threads(); + bool get_task(worker_data *thread_var, task **t); + bool wait_for_tasks(std::unique_lock &lk, + worker_data *thread_var); + void cancel_pending(task* t); + + size_t thread_count() + { + return m_active_threads.size() + m_standby_threads.size(); + } +public: + thread_pool_generic(int min_threads, int max_threads); + ~thread_pool_generic(); + void wait_begin() override; + void wait_end() override; + void submit_task(task *task) override; + virtual aio *create_native_aio(int max_io) override + { +#ifdef _WIN32 + return create_win_aio(this, max_io); +#elif defined(__linux__) + return create_linux_aio(this,max_io); +#else + return nullptr; +#endif + } + + class timer_generic : public thr_timer_t, public timer + { + thread_pool_generic* m_pool; + waitable_task m_task; + callback_func m_callback; + void* m_data; + int m_period; + std::mutex m_mtx; + bool m_on; + std::atomic m_running; + + void run() + { + /* + In rare cases, multiple callbacks can be scheduled, + at the same time,. e.g with set_time(0,0) in a loop. + We do not allow parallel execution, since it is against the expectations. + */ + if (m_running.fetch_add(1, std::memory_order_acquire) > 0) + return; + do + { + m_callback(m_data); + } + while (m_running.fetch_sub(1, std::memory_order_release) != 1); + + if (m_pool && m_period) + { + std::unique_lock lk(m_mtx); + if (m_on) + { + DBUG_PUSH_EMPTY; + thr_timer_end(this); + thr_timer_settime(this, 1000ULL * m_period); + DBUG_POP_EMPTY; + } + } + } + + static void execute(void* arg) + { + auto timer = (timer_generic*)arg; + timer->run(); + } + + static void submit_task(void* arg) + { + timer_generic* timer = (timer_generic*)arg; + timer->m_pool->submit_task(&timer->m_task); + } + + public: + timer_generic(callback_func func, void* data, thread_pool_generic * pool): + m_pool(pool), + m_task(timer_generic::execute,this), + m_callback(func),m_data(data),m_period(0),m_mtx(), + m_on(true),m_running() + { + if (pool) + { + /* EXecute callback in threadpool*/ + thr_timer_init(this, submit_task, this); + } + else + { + /* run in "timer" thread */ + thr_timer_init(this, m_task.get_func(), m_task.get_arg()); + } + } + + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock lk(m_mtx); + if (!m_on) + return; + thr_timer_end(this); + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + thr_timer_settime(this, 1000ULL * initial_delay_ms); + } + + /* + Change only period of a periodic timer + (after the next execution). Workarounds + mysys timer deadlocks + */ + void set_period(int period_ms) + { + std::unique_lock lk(m_mtx); + if (!m_on) + return; + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + } + + void disarm() override + { + std::unique_lock lk(m_mtx); + m_on = false; + thr_timer_end(this); + lk.unlock(); + + if (m_task.m_group) + { + m_task.m_group->cancel_pending(&m_task); + } + if (m_pool) + { + m_pool->cancel_pending(&m_task); + } + m_task.wait(); + } + + virtual ~timer_generic() + { + disarm(); + } + }; + timer_generic m_maintenance_timer; + virtual timer* create_timer(callback_func func, void *data) override + { + return new timer_generic(func, data, this); + } + void set_concurrency(unsigned int concurrency=0) override; +}; + +void thread_pool_generic::cancel_pending(task* t) +{ + std::unique_lock lk(m_mtx); + for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++) + { + if (*it == t) + { + t->release(); + *it = nullptr; + } + } +} +/** + Register worker in standby list, and wait to be woken. + + @retval true if thread was woken + @retval false idle wait timeout exceeded (the current thread must shutdown) +*/ +bool thread_pool_generic::wait_for_tasks(std::unique_lock &lk, + worker_data *thread_data) +{ + assert(m_task_queue.empty()); + assert(!m_in_shutdown); + + thread_data->m_wake_reason= WAKE_REASON_NONE; + m_active_threads.erase(thread_data); + m_standby_threads.push_back(thread_data); + + for (;;) + { + thread_data->m_cv.wait_for(lk, m_thread_timeout); + + if (thread_data->m_wake_reason != WAKE_REASON_NONE) + { + /* Woke up not due to timeout.*/ + return true; + } + + if (thread_count() <= m_min_threads) + { + /* Do not shutdown thread, maintain required minimum of worker + threads.*/ + continue; + } + + /* + Woke up due to timeout, remove this thread's from the standby list. In + all other cases where it is signaled it is removed by the signaling + thread. + */ + m_standby_threads.erase(thread_data); + m_active_threads.push_back(thread_data); + return false; + } +} + + +/** + Workers "get next task" routine. + + A task can be handed over to the current thread directly during submit(). + if thread_var->m_wake_reason == WAKE_REASON_TASK. + + Or a task can be taken from the task queue. + In case task queue is empty, the worker thread will park (wait for wakeup). +*/ +bool thread_pool_generic::get_task(worker_data *thread_var, task **t) +{ + std::unique_lock lk(m_mtx); + + if (thread_var->is_long_task()) + { + DBUG_ASSERT(m_long_tasks_count); + m_long_tasks_count--; + } + DBUG_ASSERT(!thread_var->is_waiting()); + thread_var->m_state = worker_data::NONE; + + while (m_task_queue.empty()) + { + if (m_in_shutdown) + return false; + + if (!wait_for_tasks(lk, thread_var)) + return false; + if (m_task_queue.empty()) + { + m_spurious_wakeups++; + continue; + } + } + + /* Dequeue from the task queue.*/ + *t= m_task_queue.front(); + m_task_queue.pop(); + m_tasks_dequeued++; + thread_var->m_state |= worker_data::EXECUTING_TASK; + thread_var->m_task_start_time = m_timestamp; + return true; +} + +/** Worker thread shutdown routine. */ +void thread_pool_generic::worker_end(worker_data* thread_data) +{ + std::lock_guard lk(m_mtx); + DBUG_ASSERT(!thread_data->is_long_task()); + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + + if (!thread_count() && m_in_shutdown) + { + /* Signal the destructor that no more threads are left. */ + m_cv_no_threads.notify_all(); + } +} + +extern "C" void set_tls_pool(tpool::thread_pool* pool); + +/* The worker get/execute task loop.*/ +void thread_pool_generic::worker_main(worker_data *thread_var) +{ + task* task; + set_tls_pool(this); + if(m_worker_init_callback) + m_worker_init_callback(); + + tls_worker_data = thread_var; + m_thread_creation_pending.clear(); + + while (get_task(thread_var, &task) && task) + { + task->execute(); + } + + if (m_worker_destroy_callback) + m_worker_destroy_callback(); + + worker_end(thread_var); +} + + +/* + Check if threadpool had been idle for a while + Switch off maintenance timer if it is in idle state + for too long. + + Helper function, to be used inside maintenance callback, + before m_last_activity is updated +*/ + +static const auto invalid_timestamp= std::chrono::system_clock::time_point::max(); +constexpr auto max_idle_time= std::chrono::minutes(1); + +/* Time since maintenance timer had nothing to do */ +static std::chrono::system_clock::time_point idle_since= invalid_timestamp; +void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now) +{ + DBUG_ASSERT(m_task_queue.empty()); + + /* + We think that there is no activity, if there were at most 2 tasks + since last time, and there is a spare thread. + The 2 tasks (and not 0) is to account for some periodic timers. + */ + bool idle= m_standby_threads.m_count > 0; + + if (!idle) + { + idle_since= invalid_timestamp; + return; + } + + if (idle_since == invalid_timestamp) + { + idle_since= now; + return; + } + + /* Switch timer off after 1 minute of idle time */ + if (now - idle_since > max_idle_time) + { + idle_since= invalid_timestamp; + switch_timer(timer_state_t::OFF); + } +} + + +/* + Periodic job to fix thread count and concurrency, + in case of long tasks, etc +*/ +void thread_pool_generic::maintenance() +{ + /* + If pool is busy (i.e the its mutex is currently locked), we can + skip the maintenance task, some times, to lower mutex contention + */ + static int skip_counter; + const int MAX_SKIPS = 10; + std::unique_lock lk(m_mtx, std::defer_lock); + if (skip_counter == MAX_SKIPS) + { + lk.lock(); + } + else if (!lk.try_lock()) + { + skip_counter++; + return; + } + + skip_counter = 0; + + m_timestamp = std::chrono::system_clock::now(); + + if (m_task_queue.empty()) + { + check_idle(m_timestamp); + m_last_activity = m_tasks_dequeued + m_wakeups; + return; + } + + m_long_tasks_count = 0; + for (auto thread_data = m_active_threads.front(); + thread_data; + thread_data = thread_data->m_next) + { + if (thread_data->is_executing_task() && + !thread_data->is_waiting() && + (thread_data->is_long_task() + || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION))) + { + thread_data->m_state |= worker_data::LONG_TASK; + m_long_tasks_count++; + } + } + + maybe_wake_or_create_thread(); + + size_t thread_cnt = (int)thread_count(); + if (m_last_activity == m_tasks_dequeued + m_wakeups && + m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) + { + // no progress made since last iteration. create new + // thread + add_thread(); + } + m_last_activity = m_tasks_dequeued + m_wakeups; + m_last_thread_count= thread_cnt; +} + +/* + Heuristic used for thread creation throttling. + Returns interval in milliseconds between thread creation + (depending on number of threads already in the pool, and + desired concurrency level) +*/ +static int throttling_interval_ms(size_t n_threads,size_t concurrency) +{ + if (n_threads < concurrency*4) + return 0; + + if (n_threads < concurrency*8) + return 50; + + if (n_threads < concurrency*16) + return 100; + + return 200; +} + +/* Create a new worker.*/ +bool thread_pool_generic::add_thread() +{ + if (m_thread_creation_pending.test_and_set()) + return false; + + size_t n_threads = thread_count(); + + if (n_threads >= m_max_threads) + return false; + if (n_threads >= m_min_threads) + { + auto now = std::chrono::system_clock::now(); + if (now - m_last_thread_creation < + std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency))) + { + /* + Throttle thread creation and wakeup deadlock detection timer, + if is it off. + */ + switch_timer(timer_state_t::ON); + + return false; + } + } + + worker_data *thread_data = m_thread_data_cache.get(); + m_active_threads.push_back(thread_data); + try + { + std::thread thread(&thread_pool_generic::worker_main, this, thread_data); + m_last_thread_creation = std::chrono::system_clock::now(); + thread.detach(); + } + catch (std::system_error& e) + { + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + static bool warning_written; + if (!warning_written) + { + fprintf(stderr, "Warning : threadpool thread could not be created :%s," + "current number of threads in pool %zu\n", e.what(), thread_count()); + warning_written = true; + } + return false; + } + return true; +} + +/** Wake a standby thread, and hand the given task over to this thread. */ +bool thread_pool_generic::wake(worker_wake_reason reason, task *) +{ + assert(reason != WAKE_REASON_NONE); + + if (m_standby_threads.empty()) + return false; + auto var= m_standby_threads.back(); + m_standby_threads.pop_back(); + m_active_threads.push_back(var); + assert(var->m_wake_reason == WAKE_REASON_NONE); + var->m_wake_reason= reason; + var->m_cv.notify_one(); + m_wakeups++; + return true; +} + + +thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : + m_thread_data_cache(max_threads), + m_task_queue(10000), + m_standby_threads(), + m_active_threads(), + m_mtx(), + m_thread_timeout(std::chrono::milliseconds(60000)), + m_timer_interval(std::chrono::milliseconds(400)), + m_cv_no_threads(), + m_cv_timer(), + m_tasks_enqueued(), + m_tasks_dequeued(), + m_wakeups(), + m_spurious_wakeups(), + m_in_shutdown(), + m_timestamp(), + m_long_tasks_count(), + m_waiting_task_count(), + m_last_thread_creation(), + m_min_threads(min_threads), + m_max_threads(max_threads), + m_last_thread_count(), + m_last_activity(), + m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr) +{ + set_concurrency(); + // start the timer + m_maintenance_timer.set_time(0, (int)m_timer_interval.count()); +} + + +void thread_pool_generic::maybe_wake_or_create_thread() +{ + if (m_task_queue.empty()) + return; + DBUG_ASSERT(m_active_threads.size() >= static_cast(m_long_tasks_count + m_waiting_task_count)); + if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency) + return; + if (!m_standby_threads.empty()) + { + wake(WAKE_REASON_TASK); + } + else + { + add_thread(); + } +} + +bool thread_pool_generic::too_many_active_threads() +{ + return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > + m_concurrency* OVERSUBSCRIBE_FACTOR; +} + +void thread_pool_generic::set_concurrency(unsigned int concurrency) +{ + std::unique_lock lk(m_mtx); + if (concurrency == 0) + concurrency= 2 * std::thread::hardware_concurrency(); + m_concurrency = concurrency; + if (m_concurrency > m_max_threads) + m_concurrency = m_max_threads; + if (m_concurrency < m_min_threads) + m_concurrency = m_min_threads; + if (m_concurrency < 1) + m_concurrency = 1; +} + +/** Submit a new task*/ +void thread_pool_generic::submit_task(task* task) +{ + std::unique_lock lk(m_mtx); + if (m_in_shutdown) + return; + task->add_ref(); + m_tasks_enqueued++; + m_task_queue.push(task); + maybe_wake_or_create_thread(); +} + + +/* Notify thread pool that current thread is going to wait */ +void thread_pool_generic::wait_begin() +{ + if (!tls_worker_data || tls_worker_data->is_long_task()) + return; + std::unique_lock lk(m_mtx); + if(tls_worker_data->is_long_task()) + { + /* + Current task flag could have become "long-running" + while waiting for the lock, thus recheck. + */ + return; + } + DBUG_ASSERT(!tls_worker_data->is_waiting()); + tls_worker_data->m_state |= worker_data::WAITING; + m_waiting_task_count++; + + /* Maintain concurrency */ + maybe_wake_or_create_thread(); +} + + +void thread_pool_generic::wait_end() +{ + if (tls_worker_data && tls_worker_data->is_waiting()) + { + std::unique_lock lk(m_mtx); + tls_worker_data->m_state &= ~worker_data::WAITING; + m_waiting_task_count--; + } +} + + +void thread_pool_generic::switch_timer(timer_state_t state) +{ + if (m_timer_state == state) + return; + /* + We can't use timer::set_time, because mysys timers are deadlock + prone. + + Instead, to switch off we increase the timer period + and decrease period to switch on. + + This might introduce delays in thread creation when needed, + as period will only be changed when timer fires next time. + For this reason, we can't use very long periods for the "off" state. + */ + m_timer_state= state; + long long period= (state == timer_state_t::OFF) ? + m_timer_interval.count()*10: m_timer_interval.count(); + + m_maintenance_timer.set_period((int)period); +} + + +/** + Wake up all workers, and wait until they are gone + Stop the timer. +*/ +thread_pool_generic::~thread_pool_generic() +{ + /* + Stop AIO early. + This is needed to prevent AIO completion thread + from calling submit_task() on an object that is being destroyed. + */ + m_aio.reset(); + + /* Also stop the maintanence task early. */ + m_maintenance_timer.disarm(); + + std::unique_lock lk(m_mtx); + m_in_shutdown= true; + + /* Wake up idle threads. */ + while (wake(WAKE_REASON_SHUTDOWN)) + { + } + + while (thread_count()) + { + m_cv_no_threads.wait(lk); + } + + lk.unlock(); +} + +thread_pool *create_thread_pool_generic(int min_threads, int max_threads) +{ + return new thread_pool_generic(min_threads, max_threads); +} + +} // namespace tpool diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h new file mode 100644 index 00000000..099ae5c7 --- /dev/null +++ b/tpool/tpool_structs.h @@ -0,0 +1,440 @@ +/* Copyright(C) 2019, 20222, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include +#include +#include +#include +#include +#include + +/* Suppress TSAN warnings, that we believe are not critical. */ +#if defined(__has_feature) +#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__) +#else +#define TPOOL_HAS_FEATURE(...) 0 +#endif + +#if TPOOL_HAS_FEATURE(address_sanitizer) +#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize("thread"),noinline)) +#elif defined(__GNUC__) && defined (__SANITIZE_THREAD__) +#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize_thread,noinline)) +#else +#define TPOOL_SUPPRESS_TSAN +#endif + +#ifdef HAVE_PSI_INTERFACE +typedef unsigned int mysql_pfs_key_t; +extern mysql_pfs_key_t tpool_cache_mutex_key; +#endif + +namespace tpool +{ + +/** + Generic "pointer" cache of a fixed size + with fast put/get operations. + + Compared to STL containers,e.g stack or queue + is faster/does not do allocations. + + However, get() operation will wait if there is no free items. + + We assume that put() will only put back the elements that + were retrieved previously with get(). +*/ +template class cache +{ + /** Protects updates of m_pos and m_cache members */ + mysql_mutex_t m_mtx; + + /** + Notify waiting threads about "cache full" or "cache not empty" conditions + @see get() and wait() + */ + pthread_cond_t m_cv; + + /** Cached items vector.Does not change after construction */ + std::vector m_base; + + /** + Pointers to cached items. Protected by m_mtx. Does not grow after + construction. Elements in position [0,m_pos-1] are "borrowed", + elements in position [m_pos,capacity()-1] are "free" + */ + std::vector m_cache; + + /** Number of threads waiting for "cache full" condition (s. wait()) + Protected by m_mtx */ + int m_waiters; + + /** Current cache size. Protected by m_mtx*/ + size_t m_pos; + +private: + + inline size_t capacity() + { + return m_base.size(); + } + + /** + @return true if cache is full (no items are borrowed) + */ + bool is_full() + { + return m_pos == 0; + } + + /** + @return true if cache is empty (all items are borrowed) + */ + bool is_empty() + { + return m_pos == capacity(); + } + +public: + /** + Constructor + @param size - maximum number of items in cache + */ + cache(size_t size) : m_base(size), m_cache(size), + m_waiters(), m_pos(0) + { + mysql_mutex_init(tpool_cache_mutex_key, &m_mtx, nullptr); + pthread_cond_init(&m_cv, nullptr); + + for(size_t i= 0 ; i < size; i++) + m_cache[i]= &m_base[i]; + } + + ~cache() + { + mysql_mutex_destroy(&m_mtx); + pthread_cond_destroy(&m_cv); + } + + /** + Retrieve an item from cache. Waits for free item, if cache is + currently empty. + @return borrowed item + */ + T* get() + { + mysql_mutex_lock(&m_mtx); + while (is_empty()) + my_cond_wait(&m_cv, &m_mtx.m_mutex); + assert(m_pos < capacity()); + // return last element + T *t= m_cache[m_pos++]; + mysql_mutex_unlock(&m_mtx); + return t; + } + + mysql_mutex_t &mutex() { return m_mtx; } + + /** + Put back an element to cache. + @param ele element to put back + */ + void put(T *ele) + { + mysql_mutex_lock(&m_mtx); + assert(!is_full()); + // put element to the logical end of the array + m_cache[--m_pos] = ele; + + /* Notify waiters when the cache becomes + not empty, or when it becomes full */ + if (m_pos == 1 || (m_waiters && is_full())) + pthread_cond_broadcast(&m_cv); + mysql_mutex_unlock(&m_mtx); + } + + /** Check if pointer represents cached element */ + bool contains(T* ele) + { + // No locking required, m_base does not change after construction. + return ele >= &m_base[0] && ele <= &m_base[capacity() - 1]; + } + + /** Wait until cache is full + @param m cache mutex (locked) */ + void wait(mysql_mutex_t &m) + { + mysql_mutex_assert_owner(&m); + m_waiters++; + while (!is_full()) + my_cond_wait(&m_cv, &m.m_mutex); + m_waiters--; + } + + /* Wait until cache is full.*/ + void wait() + { + mysql_mutex_lock(&m_mtx); + wait(m_mtx); + mysql_mutex_unlock(&m_mtx); + } + + /** + @return approximate number of "borrowed" items. + A "dirty" read, not used in any critical functionality. + */ + TPOOL_SUPPRESS_TSAN size_t pos() + { + return m_pos; + } + + void resize(size_t count) + { + mysql_mutex_assert_owner(&m_mtx); + assert(is_full()); + m_base.resize(count); + m_cache.resize(count); + for (size_t i = 0; i < count; i++) + m_cache[i] = &m_base[i]; + } +}; + + +/** + Circular, fixed size queue + used for the task queue. + + Compared to STL queue, this one is + faster, and does not do memory allocations +*/ +template class circular_queue +{ + +public: + circular_queue(size_t N = 16) + : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail() + { + } + bool empty() { return m_head == m_tail; } + bool full() { return (m_head + 1) % m_capacity == m_tail; } + void clear() { m_head = m_tail = 0; } + void resize(size_t new_size) + { + auto current_size = size(); + if (new_size <= current_size) + return; + size_t new_capacity = new_size - 1; + std::vector new_buffer(new_capacity); + /* Figure out faster way to copy*/ + size_t i = 0; + while (!empty()) + { + T& ele = front(); + pop(); + new_buffer[i++] = ele; + } + m_buffer = new_buffer; + m_capacity = new_capacity; + m_tail = 0; + m_head = current_size; + } + void push(T ele) + { + if (full()) + { + assert(size() == m_capacity - 1); + resize(size() + 1024); + } + m_buffer[m_head] = ele; + m_head = (m_head + 1) % m_capacity; + } + void push_front(T ele) + { + if (full()) + { + resize(size() + 1024); + } + if (m_tail == 0) + m_tail = m_capacity - 1; + else + m_tail--; + m_buffer[m_tail] = ele; + } + T& front() + { + assert(!empty()); + return m_buffer[m_tail]; + } + void pop() + { + assert(!empty()); + m_tail = (m_tail + 1) % m_capacity; + } + size_t size() + { + if (m_head < m_tail) + { + return m_capacity - m_tail + m_head; + } + else + { + return m_head - m_tail; + } + } + + /*Iterator over elements in queue.*/ + class iterator + { + size_t m_pos; + circular_queue* m_queue; + public: + explicit iterator(size_t pos , circular_queue* q) : m_pos(pos), m_queue(q) {} + iterator& operator++() + { + m_pos= (m_pos + 1) % m_queue->m_capacity; + return *this; + } + iterator operator++(int) + { + iterator retval= *this; + ++*this; + return retval; + } + bool operator==(iterator other) const { return m_pos == other.m_pos; } + bool operator!=(iterator other) const { return !(*this == other); } + T& operator*() const { return m_queue->m_buffer[m_pos]; } + }; + + iterator begin() { return iterator(m_tail, this); } + iterator end() { return iterator(m_head, this); } +private: + size_t m_capacity; + std::vector m_buffer; + size_t m_head; + size_t m_tail; +}; + +/* Doubly linked list. Intrusive, + requires element to have m_next and m_prev pointers. +*/ +template class doubly_linked_list +{ +public: + T* m_first; + T* m_last; + size_t m_count; + doubly_linked_list():m_first(),m_last(),m_count() + {} + void check() + { + assert(!m_first || !m_first->m_prev); + assert(!m_last || !m_last->m_next); + assert((!m_first && !m_last && m_count == 0) + || (m_first != 0 && m_last != 0 && m_count > 0)); + T* current = m_first; + for(size_t i=1; i< m_count;i++) + { + current = current->m_next; + } + assert(current == m_last); + current = m_last; + for (size_t i = 1; i < m_count; i++) + { + current = current->m_prev; + } + assert(current == m_first); + } + T* front() + { + return m_first; + } + size_t size() + { + return m_count; + } + void push_back(T* ele) + { + ele->m_prev = m_last; + if (m_last) + m_last->m_next = ele; + + ele->m_next = 0; + m_last = ele; + if (!m_first) + m_first = m_last; + + m_count++; + } + T* back() + { + return m_last; + } + bool empty() + { + return m_count == 0; + } + void pop_back() + { + m_last = m_last->m_prev; + if (m_last) + m_last->m_next = 0; + else + m_first = 0; + m_count--; + } + bool contains(T* ele) + { + if (!ele) + return false; + T* current = m_first; + while(current) + { + if(current == ele) + return true; + current = current->m_next; + } + return false; + } + + void erase(T* ele) + { + assert(contains(ele)); + + if (ele == m_first) + { + m_first = ele->m_next; + if (m_first) + m_first->m_prev = 0; + else + m_last = 0; + } + else if (ele == m_last) + { + assert(ele->m_prev); + m_last = ele->m_prev; + m_last->m_next = 0; + } + else + { + assert(ele->m_next); + assert(ele->m_prev); + ele->m_next->m_prev = ele->m_prev; + ele->m_prev->m_next = ele->m_next; + } + m_count--; + } +}; + +} diff --git a/tpool/tpool_win.cc b/tpool/tpool_win.cc new file mode 100644 index 00000000..88168b26 --- /dev/null +++ b/tpool/tpool_win.cc @@ -0,0 +1,291 @@ +/* Copyright (C) 2019, 2021, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include +#include +#include +#include + +/** + Implementation of tpool/aio based on Windows native threadpool. +*/ + +namespace tpool +{ +/** + Pool, based on Windows native(Vista+) threadpool. +*/ +class thread_pool_win : public thread_pool +{ + /** + Handle per-thread init/term functions. + Since it is Windows that creates thread, and not us, + it is tricky. We employ thread local storage data + and check whether init function was called, inside every callback. + */ + struct tls_data + { + thread_pool_win *m_pool; + ~tls_data() + { + /* Call thread termination function. */ + if (!m_pool) + return; + + if (m_pool->m_worker_destroy_callback) + m_pool->m_worker_destroy_callback(); + + m_pool->m_thread_count--; + } + /** This needs to be called before every IO or simple task callback.*/ + void callback_prolog(thread_pool_win* pool) + { + assert(pool); + assert(!m_pool || (m_pool == pool)); + if (m_pool) + { + // TLS data already initialized. + return; + } + m_pool = pool; + m_pool->m_thread_count++; + // Call the thread init function. + if (m_pool->m_worker_init_callback) + m_pool->m_worker_init_callback(); + } + }; + + static thread_local struct tls_data tls_data; + /** Timer */ + class native_timer : public timer + { + std::mutex m_mtx; // protects against parallel execution + std::mutex m_shutdown_mtx; // protects m_on + PTP_TIMER m_ptp_timer; + callback_func m_func; + void *m_data; + thread_pool_win& m_pool; + int m_period; + bool m_on; + + static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context, + PTP_TIMER callback_timer) + { + native_timer *timer= (native_timer *) context; + tls_data.callback_prolog(&timer->m_pool); + std::unique_lock lk(timer->m_mtx, std::defer_lock); + if (!lk.try_lock()) + { + /* Do not try to run timers in parallel */ + return; + } + timer->m_func(timer->m_data); + if (timer->m_period) + timer->set_time(timer->m_period, timer->m_period); + } + + public: + native_timer(thread_pool_win& pool, callback_func func, void* data) : + m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true) + { + m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env); + } + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock lk(m_shutdown_mtx); + if (!m_on) + return; + long long initial_delay = -10000LL * initial_delay_ms; + SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0); + SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100); + m_period = period_ms; + } + void disarm() override + { + std::unique_lock lk(m_shutdown_mtx); + m_on = false; + SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0); + lk.unlock(); + /* Don't do it in timer callback, that will hang*/ + WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE); + } + + ~native_timer() + { + disarm(); + CloseThreadpoolTimer(m_ptp_timer); + } + }; + /** AIO handler */ + class native_aio : public aio + { + thread_pool_win& m_pool; + + public: + native_aio(thread_pool_win &pool, int max_io) + : m_pool(pool) + { + } + + /** + Submit async IO. + */ + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + cb->m_internal = this; + StartThreadpoolIo(cb->m_fh.m_ptp_io); + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + + CancelThreadpoolIo(cb->m_fh.m_ptp_io); + return -1; + } + + /** + PTP_WIN32_IO_CALLBACK-typed function, required parameter for + CreateThreadpoolIo(). The user callback and other auxiliary data is put into + the extended OVERLAPPED parameter. + */ + static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, + ULONG io_result, ULONG_PTR nbytes, + PTP_IO io) + { + aiocb* cb = (aiocb*)overlapped; + native_aio* aio = (native_aio*)cb->m_internal; + tls_data.callback_prolog(&aio->m_pool); + cb->m_err = io_result; + cb->m_ret_len = (int)nbytes; + cb->m_internal_task.m_func = cb->m_callback; + cb->m_internal_task.m_group = cb->m_group; + cb->m_internal_task.m_arg = cb; + cb->m_internal_task.execute(); + } + + /** + Binds the file handle via CreateThreadpoolIo(). + */ + virtual int bind(native_file_handle& fd) override + { + fd.m_ptp_io = + CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env)); + if (fd.m_ptp_io) + return 0; + return -1; + } + + /** + Unbind the file handle via CloseThreadpoolIo. + */ + virtual int unbind(const native_file_handle& fd) override + { + if (fd.m_ptp_io) + CloseThreadpoolIo(fd.m_ptp_io); + return 0; + } + }; + + PTP_POOL m_ptp_pool; + TP_CALLBACK_ENVIRON m_env; + PTP_CLEANUP_GROUP m_cleanup; + const int TASK_CACHE_SIZE= 10000; + + struct task_cache_entry + { + thread_pool_win *m_pool; + task* m_task; + }; + cache m_task_cache; + std::atomic m_thread_count; +public: + thread_pool_win(int min_threads= 0, int max_threads= 0) + : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0) + { + InitializeThreadpoolEnvironment(&m_env); + m_ptp_pool= CreateThreadpool(NULL); + m_cleanup= CreateThreadpoolCleanupGroup(); + SetThreadpoolCallbackPool(&m_env, m_ptp_pool); + SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0); + if (min_threads) + SetThreadpoolThreadMinimum(m_ptp_pool, min_threads); + if (max_threads) + SetThreadpoolThreadMaximum(m_ptp_pool, max_threads); + } + ~thread_pool_win() + { + CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL); + CloseThreadpoolCleanupGroup(m_cleanup); + CloseThreadpool(m_ptp_pool); + + // Wait until all threads finished and TLS destructors ran. + while(m_thread_count) + Sleep(1); + } + /** + PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback() + */ + static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param) + { + auto entry= (task_cache_entry *) param; + auto task= entry->m_task; + + tls_data.callback_prolog(entry->m_pool); + + entry->m_pool->m_task_cache.put(entry); + + task->execute(); + } + virtual void submit_task(task *task) override + { + auto entry= m_task_cache.get(); + task->add_ref(); + entry->m_pool= this; + entry->m_task= task; + if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env)) + abort(); + } + + aio *create_native_aio(int max_io) override + { + return new native_aio(*this, max_io); + } + + timer* create_timer(callback_func func, void* data) override + { + return new native_timer(*this, func, data); + } +}; + +thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data; + +thread_pool *create_thread_pool_win(int min_threads, int max_threads) +{ + return new thread_pool_win(min_threads, max_threads); +} +} // namespace tpool diff --git a/tpool/wait_notification.cc b/tpool/wait_notification.cc new file mode 100644 index 00000000..7743e2db --- /dev/null +++ b/tpool/wait_notification.cc @@ -0,0 +1,25 @@ +#include + +namespace tpool +{ +static thread_local tpool::thread_pool* tls_thread_pool; + +extern "C" void set_tls_pool(tpool::thread_pool* pool) +{ + tls_thread_pool = pool; +} + +extern "C" void tpool_wait_begin() +{ + if (tls_thread_pool) + tls_thread_pool->wait_begin(); +} + + +extern "C" void tpool_wait_end() +{ + if (tls_thread_pool) + tls_thread_pool->wait_end(); +} + +} -- cgit v1.2.3