diff options
Diffstat (limited to 'tpool')
-rw-r--r-- | tpool/CMakeLists.txt | 29 | ||||
-rw-r--r-- | tpool/aio_linux.cc | 193 | ||||
-rw-r--r-- | tpool/aio_simulated.cc | 186 | ||||
-rw-r--r-- | tpool/aio_win.cc | 139 | ||||
-rw-r--r-- | tpool/task.cc | 108 | ||||
-rw-r--r-- | tpool/task_group.cc | 100 | ||||
-rw-r--r-- | tpool/tpool.h | 261 | ||||
-rw-r--r-- | tpool/tpool_generic.cc | 920 | ||||
-rw-r--r-- | tpool/tpool_structs.h | 357 | ||||
-rw-r--r-- | tpool/tpool_win.cc | 292 | ||||
-rw-r--r-- | tpool/wait_notification.cc | 25 |
11 files changed, 2610 insertions, 0 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt new file mode 100644 index 00000000..3e3f8e0b --- /dev/null +++ b/tpool/CMakeLists.txt @@ -0,0 +1,29 @@ +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) +IF(WIN32) + SET(EXTRA_SOURCES tpool_win.cc aio_win.cc) +ELSE() + SET(EXTRA_SOURCES aio_linux.cc) +ENDIF() + +IF(CMAKE_SYSTEM_NAME STREQUAL "Linux") + CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H) + CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO) + IF(HAVE_LIBAIO_H AND HAVE_LIBAIO) + ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1) + LINK_LIBRARIES(aio) + ENDIF() +ENDIF() + +ADD_LIBRARY(tpool STATIC + aio_simulated.cc + tpool_structs.h + CMakeLists.txt + tpool.h + tpool_generic.cc + task_group.cc + task.cc + wait_notification.cc + ${EXTRA_SOURCES} +) + +INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include)
\ No newline at end of file diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc new file mode 100644 index 00000000..d9aa8be2 --- /dev/null +++ b/tpool/aio_linux.cc @@ -0,0 +1,193 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include "tpool.h" + +#ifdef LINUX_NATIVE_AIO +# include <thread> +# include <atomic> +# include <libaio.h> +# include <sys/syscall.h> + +/** + Invoke the io_getevents() system call, without timeout parameter. + + @param ctx context from io_setup() + @param min_nr minimum number of completion events to wait for + @param nr maximum number of completion events to collect + @param ev the collected events + + In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc + the io_getevents() implementation in libaio was "optimized" so that it + would elide the system call when there are no outstanding requests + and a timeout was specified. + + The libaio code for dereferencing ctx would occasionally trigger + SIGSEGV if io_destroy() was concurrently invoked from another thread. + Hence, we have to use the raw system call. + + WHY are we doing this at all? + Because we want io_destroy() from another thread to interrupt io_getevents(). + + And, WHY do we want io_destroy() from another thread to interrupt + io_getevents()? + + Because there is no documented, libaio-friendly and race-condition-free way to + interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work + for us so far. + + Historical note : in the past, we used io_getevents with timeouts. We'd wake + up periodically, check for shutdown flag, return from the main routine. + This was admittedly safer, yet it did cost periodic wakeups, which we are not + willing to do anymore. + + @note we also rely on the undocumented property, that io_destroy(ctx) + will make this version of io_getevents return EINVAL. +*/ +static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev) +{ + int saved_errno= errno; + int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx), + min_nr, nr, ev, 0); + if (ret < 0) + { + ret= -errno; + errno= saved_errno; + } + return ret; +} +#endif + + +/* + Linux AIO implementation, based on native AIO. + Needs libaio.h and -laio at the compile time. + + io_submit() is used to submit async IO. + + A single thread will collect the completion notification + with io_getevents() and forward io completion callback to + the worker threadpool. +*/ +namespace tpool +{ +#ifdef LINUX_NATIVE_AIO + +class aio_linux final : public aio +{ + thread_pool *m_pool; + io_context_t m_io_ctx; + std::thread m_getevent_thread; + static std::atomic<bool> shutdown_in_progress; + + static void getevent_thread_routine(aio_linux *aio) + { + /* + We collect events in small batches to hopefully reduce the + number of system calls. + */ + constexpr unsigned MAX_EVENTS= 256; + + io_event events[MAX_EVENTS]; + for (;;) + { + switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) { + case -EINTR: + continue; + case -EINVAL: + if (shutdown_in_progress) + return; + /* fall through */ + default: + if (ret < 0) + { + fprintf(stderr, "io_getevents returned %d\n", ret); + abort(); + return; + } + for (int i= 0; i < ret; i++) + { + const io_event &event= events[i]; + aiocb *iocb= static_cast<aiocb*>(event.obj); + if (static_cast<int>(event.res) < 0) + { + iocb->m_err= -event.res; + iocb->m_ret_len= 0; + } + else + { + iocb->m_ret_len= event.res; + iocb->m_err= 0; + } + iocb->m_internal_task.m_func= iocb->m_callback; + iocb->m_internal_task.m_arg= iocb; + iocb->m_internal_task.m_group= iocb->m_group; + aio->m_pool->submit_task(&iocb->m_internal_task); + } + } + } + } + +public: + aio_linux(io_context_t ctx, thread_pool *pool) + : m_pool(pool), m_io_ctx(ctx), + m_getevent_thread(getevent_thread_routine, this) + { + } + + ~aio_linux() + { + shutdown_in_progress= true; + io_destroy(m_io_ctx); + m_getevent_thread.join(); + shutdown_in_progress= false; + } + + int submit_io(aiocb *cb) override + { + io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len, + cb->m_offset); + if (cb->m_opcode != aio_opcode::AIO_PREAD) + cb->aio_lio_opcode= IO_CMD_PWRITE; + iocb *icb= static_cast<iocb*>(cb); + int ret= io_submit(m_io_ctx, 1, &icb); + if (ret == 1) + return 0; + errno= -ret; + return -1; + } + + int bind(native_file_handle&) override { return 0; } + int unbind(const native_file_handle&) override { return 0; } +}; + +std::atomic<bool> aio_linux::shutdown_in_progress; + +aio *create_linux_aio(thread_pool *pool, int max_io) +{ + io_context_t ctx; + memset(&ctx, 0, sizeof ctx); + if (int ret= io_setup(max_io, &ctx)) + { + fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret); + return nullptr; + } + return new aio_linux(ctx, pool); +} +#else +aio *create_linux_aio(thread_pool*, int) { return nullptr; } +#endif +} diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc new file mode 100644 index 00000000..93b2ae13 --- /dev/null +++ b/tpool/aio_simulated.cc @@ -0,0 +1,186 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#ifndef _WIN32 +#include <unistd.h> /* pread(), pwrite() */ +#endif +#include "tpool.h" +#include "tpool_structs.h" +#include <stdlib.h> +#include <string.h> + +namespace tpool +{ +#ifdef _WIN32 + +/* + In order to be able to execute synchronous IO even on file opened + with FILE_FLAG_OVERLAPPED, and to bypass to completion port, + we use valid event handle for the hEvent member of the OVERLAPPED structure, + with its low-order bit set. + + See MSDN docs for GetQueuedCompletionStatus() for description of this trick. +*/ +static DWORD fls_sync_io= FLS_OUT_OF_INDEXES; +HANDLE win_get_syncio_event() +{ + HANDLE h; + + h= (HANDLE) FlsGetValue(fls_sync_io); + if (h) + { + return h; + } + h= CreateEventA(NULL, FALSE, FALSE, NULL); + /* Set low-order bit to keeps I/O completion from being queued */ + h= (HANDLE)((uintptr_t) h | 1); + FlsSetValue(fls_sync_io, h); + return h; +} +#include <WinIoCtl.h> +static void __stdcall win_free_syncio_event(void *data) +{ + if (data) + { + CloseHandle((HANDLE) data); + } +} + +struct WinIoInit +{ + WinIoInit() + { + fls_sync_io= FlsAlloc(win_free_syncio_event); + if(fls_sync_io == FLS_OUT_OF_INDEXES) + abort(); + } + ~WinIoInit() { FlsFree(fls_sync_io); } +}; + +static WinIoInit win_io_init; + + +int pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (ReadFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + + return -1; +} + +int pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset) +{ + OVERLAPPED ov{}; + ULARGE_INTEGER uli; + uli.QuadPart= offset; + ov.Offset= uli.LowPart; + ov.OffsetHigh= uli.HighPart; + ov.hEvent= win_get_syncio_event(); + + if (WriteFile(h, buf, (DWORD) count, 0, &ov) || + (GetLastError() == ERROR_IO_PENDING)) + { + DWORD n_bytes; + if (GetOverlappedResult(h, &ov, &n_bytes, TRUE)) + return n_bytes; + } + return -1; +} +#endif + +/** + Simulated AIO. + + Executes IO synchronously in worker pool + and then calls the completion routine. +*/ +class simulated_aio : public aio +{ + thread_pool *m_pool; + +public: + simulated_aio(thread_pool *tp) + : m_pool(tp) + { + } + + static void simulated_aio_callback(void *param) + { + aiocb *cb= (aiocb *) param; +#ifdef _WIN32 + size_t ret_len; +#else + ssize_t ret_len; +#endif + int err= 0; + switch (cb->m_opcode) + { + case aio_opcode::AIO_PREAD: + ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + case aio_opcode::AIO_PWRITE: + ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset); + break; + default: + abort(); + } +#ifdef _WIN32 + if (static_cast<int>(ret_len) < 0) + err= GetLastError(); +#else + if (ret_len < 0) + err= errno; +#endif + cb->m_ret_len = ret_len; + cb->m_err = err; + cb->m_internal_task.m_func= cb->m_callback; + thread_pool *pool= (thread_pool *)cb->m_internal; + pool->submit_task(&cb->m_internal_task); + } + + virtual int submit_io(aiocb *aiocb) override + { + aiocb->m_internal_task.m_func = simulated_aio_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + aiocb->m_internal = m_pool; + m_pool->submit_task(&aiocb->m_internal_task); + return 0; + } + + virtual int bind(native_file_handle &fd) override { return 0; } + virtual int unbind(const native_file_handle &fd) override { return 0; } +}; + +aio *create_simulated_aio(thread_pool *tp) +{ + return new simulated_aio(tp); +} + +} // namespace tpool diff --git a/tpool/aio_win.cc b/tpool/aio_win.cc new file mode 100644 index 00000000..b44f705b --- /dev/null +++ b/tpool/aio_win.cc @@ -0,0 +1,139 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <algorithm> +#include <assert.h> +#include <condition_variable> +#include <iostream> +#include <limits.h> +#include <mutex> +#include <queue> +#include <stack> +#include <thread> +#include <vector> +#include <tpool.h> + +namespace tpool +{ + +/* + Windows AIO implementation, completion port based. + A single thread collects the completion notification with + GetQueuedCompletionStatus(), and forwards io completion callback + the worker threadpool +*/ +class tpool_generic_win_aio : public aio +{ + /* Thread that does collects completion status from the completion port. */ + std::thread m_thread; + + /* IOCP Completion port.*/ + HANDLE m_completion_port; + + /* The worker pool where completion routine is executed, as task. */ + thread_pool* m_pool; +public: + tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool) + { + m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); + m_thread = std::thread(aio_completion_thread_proc, this); + } + + /** + Task to be executed in the work pool. + */ + static void io_completion_task(void* data) + { + auto cb = (aiocb*)data; + cb->execute_callback(); + } + + void completion_thread_work() + { + for (;;) + { + DWORD n_bytes; + aiocb* aiocb; + ULONG_PTR key; + if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key, + (LPOVERLAPPED*)& aiocb, INFINITE)) + break; + + aiocb->m_err = 0; + aiocb->m_ret_len = n_bytes; + + if (n_bytes != aiocb->m_len) + { + if (GetOverlappedResult(aiocb->m_fh, aiocb, + (LPDWORD)& aiocb->m_ret_len, FALSE)) + { + aiocb->m_err = GetLastError(); + } + } + aiocb->m_internal_task.m_func = aiocb->m_callback; + aiocb->m_internal_task.m_arg = aiocb; + aiocb->m_internal_task.m_group = aiocb->m_group; + m_pool->submit_task(&aiocb->m_internal_task); + } + } + + static void aio_completion_thread_proc(tpool_generic_win_aio* aio) + { + aio->completion_thread_work(); + } + + ~tpool_generic_win_aio() + { + if (m_completion_port) + CloseHandle(m_completion_port); + m_thread.join(); + } + + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + cb->m_internal = this; + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + return -1; + } + + // Inherited via aio + virtual int bind(native_file_handle& fd) override + { + return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0 + : GetLastError(); + } + virtual int unbind(const native_file_handle& fd) override { return 0; } +}; + +aio* create_win_aio(thread_pool* pool, int max_io) +{ + return new tpool_generic_win_aio(pool, max_io); +} + +} // namespace tpool diff --git a/tpool/task.cc b/tpool/task.cc new file mode 100644 index 00000000..0b5253bc --- /dev/null +++ b/tpool/task.cc @@ -0,0 +1,108 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <tpool.h> +#include <queue> +#include <mutex> +#include <condition_variable> +#include <tpool_structs.h> + +namespace tpool +{ + +#ifndef DBUG_OFF +static callback_func_np after_task_callback; +void set_after_task_callback(callback_func_np cb) +{ + after_task_callback= cb; +} + +void execute_after_task_callback() +{ + if (after_task_callback) + after_task_callback(); +} +#endif + + task::task(callback_func func, void* arg, task_group* group) : + m_func(func), m_arg(arg), m_group(group) {} + + void task::execute() + { + if (m_group) + { + /* Executing in a group (limiting concurrency).*/ + m_group->execute(this); + } + else + { + /* Execute directly. */ + m_func(m_arg); + dbug_execute_after_task_callback(); + release(); + } + } + + /* Task that provide wait() operation. */ + waitable_task::waitable_task(callback_func func, void* arg, task_group* group) : + task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){} + + void waitable_task::add_ref() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count++; + } + + void waitable_task::release() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_ref_count--; + if (!m_ref_count && m_waiter_count) + m_cv.notify_all(); + } + void waitable_task::wait(std::unique_lock<std::mutex>& lk) + { + m_waiter_count++; + while (m_ref_count) + m_cv.wait(lk); + m_waiter_count--; + } + void waitable_task::wait() + { + std::unique_lock<std::mutex> lk(m_mtx); + wait(lk); + } + + static void noop(void*) + { + } + void waitable_task::disable() + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_func == noop) + return; + wait(lk); + m_original_func = m_func; + m_func = noop; + } + void waitable_task::enable() + { + std::unique_lock<std::mutex> lk(m_mtx); + if(m_func != noop) + return; + wait(lk); + m_func = m_original_func; + } +} diff --git a/tpool/task_group.cc b/tpool/task_group.cc new file mode 100644 index 00000000..97fbb091 --- /dev/null +++ b/tpool/task_group.cc @@ -0,0 +1,100 @@ +/* Copyright(C) 2019 MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include <tpool.h> +#include <queue> +#include <mutex> +#include <condition_variable> +#include <tpool_structs.h> +#include <thread> +#include <assert.h> +#ifndef _WIN32 +#include <unistd.h> // usleep +#endif +namespace tpool +{ + task_group::task_group(unsigned int max_concurrency) : + m_queue(8), + m_mtx(), + m_tasks_running(), + m_max_concurrent_tasks(max_concurrency) + {}; + + void task_group::set_max_tasks(unsigned int max_concurrency) + { + std::unique_lock<std::mutex> lk(m_mtx); + m_max_concurrent_tasks = max_concurrency; + } + void task_group::execute(task* t) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_tasks_running == m_max_concurrent_tasks) + { + /* Queue for later execution by another thread.*/ + m_queue.push(t); + return; + } + m_tasks_running++; + for (;;) + { + lk.unlock(); + if (t) + { + t->m_func(t->m_arg); + dbug_execute_after_task_callback(); + t->release(); + } + lk.lock(); + + if (m_queue.empty()) + break; + t = m_queue.front(); + m_queue.pop(); + } + m_tasks_running--; + } + + void task_group::cancel_pending(task* t) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!t) + m_queue.clear(); + for (auto it = m_queue.begin(); it != m_queue.end(); it++) + { + if (*it == t) + { + (*it)->release(); + (*it) = nullptr; + } + } + } + + task_group::~task_group() + { + std::unique_lock<std::mutex> lk(m_mtx); + assert(m_queue.empty()); + + while (m_tasks_running) + { + lk.unlock(); +#ifndef _WIN32 + usleep(1000); +#else + Sleep(1); +#endif + lk.lock(); + } + } +} diff --git a/tpool/tpool.h b/tpool/tpool.h new file mode 100644 index 00000000..3a5658c0 --- /dev/null +++ b/tpool/tpool.h @@ -0,0 +1,261 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include <memory> /* unique_ptr */ +#include <condition_variable> +#include <mutex> +#include <atomic> +#include <tpool_structs.h> +#ifdef LINUX_NATIVE_AIO +#include <libaio.h> +#endif +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include <windows.h> +/** + Windows-specific native file handle struct. + Apart from the actual handle, contains PTP_IO + used by the Windows threadpool. +*/ +struct native_file_handle +{ + HANDLE m_handle; + PTP_IO m_ptp_io; + native_file_handle(){}; + native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {} + operator HANDLE() const { return m_handle; } +}; +#else +#include <unistd.h> +typedef int native_file_handle; +#endif + +namespace tpool +{ +/** + Task callback function + */ +typedef void (*callback_func)(void *); +typedef void (*callback_func_np)(void); +class task; + +/** A class that can be used e.g. for +restricting concurrency for specific class of tasks. */ + +class task_group +{ +private: + circular_queue<task*> m_queue; + std::mutex m_mtx; + std::condition_variable m_cv; + unsigned int m_tasks_running; + unsigned int m_max_concurrent_tasks; +public: + task_group(unsigned int max_concurrency= 100000); + void set_max_tasks(unsigned int max_concurrent_tasks); + void execute(task* t); + void cancel_pending(task *t); + ~task_group(); +}; + + +class task +{ +public: + callback_func m_func; + void *m_arg; + task_group* m_group; + virtual void add_ref() {}; + virtual void release() {}; + task() {}; + task(callback_func func, void* arg, task_group* group = nullptr); + void* get_arg() { return m_arg; } + callback_func get_func() { return m_func; } + virtual void execute(); + virtual ~task() {} +}; + +class waitable_task :public task +{ + std::mutex m_mtx; + std::condition_variable m_cv; + int m_ref_count; + int m_waiter_count; + callback_func m_original_func; + void wait(std::unique_lock<std::mutex>&lk); +public: + waitable_task(callback_func func, void* arg, task_group* group = nullptr); + void add_ref() override; + void release() override; + TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; } + TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;} + void wait(); + void disable(); + void enable(); + virtual ~waitable_task() {}; +}; +enum class aio_opcode +{ + AIO_PREAD, + AIO_PWRITE +}; +constexpr size_t MAX_AIO_USERDATA_LEN= 3 * sizeof(void*); + +/** IO control block, includes parameters for the IO, and the callback*/ + +struct aiocb +#ifdef _WIN32 + :OVERLAPPED +#elif defined LINUX_NATIVE_AIO + :iocb +#endif +{ + native_file_handle m_fh; + aio_opcode m_opcode; + unsigned long long m_offset; + void *m_buffer; + unsigned int m_len; + callback_func m_callback; + task_group* m_group; + /* Returned length and error code*/ + size_t m_ret_len; + int m_err; + void *m_internal; + task m_internal_task; + alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN]; + + aiocb() : m_internal_task(nullptr, nullptr) + {} + void execute_callback() + { + task t(m_callback, this,m_group); + t.execute(); + } +}; + + +/** + AIO interface +*/ +class aio +{ +public: + /** + Submit asyncronous IO. + On completion, cb->m_callback is executed. + */ + virtual int submit_io(aiocb *cb)= 0; + /** "Bind" file to AIO handler (used on Windows only) */ + virtual int bind(native_file_handle &fd)= 0; + /** "Unind" file to AIO handler (used on Windows only) */ + virtual int unbind(const native_file_handle &fd)= 0; + virtual ~aio(){}; +}; + +class timer +{ +public: + virtual void set_time(int initial_delay_ms, int period_ms) = 0; + virtual void disarm() = 0; + virtual ~timer(){} +}; + +class thread_pool; + +extern aio *create_simulated_aio(thread_pool *tp); + +#ifndef DBUG_OFF +/* + This function is useful for debugging to make sure all mutexes are released + inside a task callback +*/ +void set_after_task_callback(callback_func_np cb); +void execute_after_task_callback(); +#define dbug_execute_after_task_callback() execute_after_task_callback() +#else +#define dbug_execute_after_task_callback() do{}while(0) +#endif + +class thread_pool +{ +protected: + /* AIO handler */ + std::unique_ptr<aio> m_aio; + virtual aio *create_native_aio(int max_io)= 0; + + /** + Functions to be called at worker thread start/end + can be used for example to set some TLS variables + */ + void (*m_worker_init_callback)(void); + void (*m_worker_destroy_callback)(void); + +public: + thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback() + { + } + virtual void submit_task(task *t)= 0; + virtual timer* create_timer(callback_func func, void *data=nullptr) = 0; + void set_thread_callbacks(void (*init)(), void (*destroy)()) + { + m_worker_init_callback= init; + m_worker_destroy_callback= destroy; + } + int configure_aio(bool use_native_aio, int max_io) + { + if (use_native_aio) + m_aio.reset(create_native_aio(max_io)); + else + m_aio.reset(create_simulated_aio(this)); + return !m_aio ? -1 : 0; + } + void disable_aio() + { + m_aio.reset(); + } + int bind(native_file_handle &fd) { return m_aio->bind(fd); } + void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); } + int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } + virtual void wait_begin() {}; + virtual void wait_end() {}; + virtual ~thread_pool() {} +}; +const int DEFAULT_MIN_POOL_THREADS= 1; +const int DEFAULT_MAX_POOL_THREADS= 500; +extern thread_pool * +create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); +extern "C" void tpool_wait_begin(); +extern "C" void tpool_wait_end(); +#ifdef _WIN32 +extern thread_pool * +create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); + +/* + Helper functions, to execute pread/pwrite even if file is + opened with FILE_FLAG_OVERLAPPED, and bound to completion + port. +*/ +int pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +int pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +HANDLE win_get_syncio_event(); +#endif +} // namespace tpool diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc new file mode 100644 index 00000000..7c645b09 --- /dev/null +++ b/tpool/tpool_generic.cc @@ -0,0 +1,920 @@ +/* Copyright (C) 2019, 2020, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <limits.h> +#include <algorithm> +#include <assert.h> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <iostream> +#include <limits.h> +#include <mutex> +#include <queue> +#include <stack> +#include <thread> +#include <vector> +#include "tpool.h" +#include <assert.h> +#include <my_global.h> +#include <my_dbug.h> +#include <thr_timer.h> +#include <stdlib.h> + +namespace tpool +{ + +#ifdef __linux__ + extern aio* create_linux_aio(thread_pool* tp, int max_io); +#endif +#ifdef _WIN32 + extern aio* create_win_aio(thread_pool* tp, int max_io); +#endif + + static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500); + static const int OVERSUBSCRIBE_FACTOR = 2; + +/** + Implementation of generic threadpool. + This threadpool consists of the following components + + - The task queue. This queue is populated by submit() + - Worker that execute the work items. + - Timer thread that takes care of pool health + + The task queue is populated by submit() method. + on submit(), a worker thread can be woken, or created + to execute tasks. + + The timer thread watches if work items are being dequeued, and if not, + this can indicate potential deadlock. + Thus the timer thread can also wake or create a thread, to ensure some progress. + + Optimizations: + + - worker threads that are idle for long time will shutdown. + - worker threads are woken in LIFO order, which minimizes context switching + and also ensures that idle timeout works well. LIFO wakeup order ensures + that active threads stay active, and idle ones stay idle. + +*/ + +/** + Worker wakeup flags. +*/ +enum worker_wake_reason +{ + WAKE_REASON_NONE, + WAKE_REASON_TASK, + WAKE_REASON_SHUTDOWN +}; + + + +/* A per-worker thread structure.*/ +struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data +{ + /** Condition variable to wakeup this worker.*/ + std::condition_variable m_cv; + + /** Reason why worker was woken. */ + worker_wake_reason m_wake_reason; + + /** + If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute. + */ + task* m_task; + + /** Struct is member of intrusive doubly linked list */ + worker_data* m_prev; + worker_data* m_next; + + /* Current state of the worker.*/ + enum state + { + NONE = 0, + EXECUTING_TASK = 1, + LONG_TASK = 2, + WAITING = 4 + }; + + int m_state; + + bool is_executing_task() + { + return m_state & EXECUTING_TASK; + } + bool is_long_task() + { + return m_state & LONG_TASK; + } + bool is_waiting() + { + return m_state & WAITING; + } + std::chrono::system_clock::time_point m_task_start_time; + worker_data() : + m_cv(), + m_wake_reason(WAKE_REASON_NONE), + m_task(), + m_prev(), + m_next(), + m_state(NONE), + m_task_start_time() + {} + + /*Define custom new/delete because of overaligned structure. */ + void* operator new(size_t size) + { +#ifdef _WIN32 + return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE); +#else + void* ptr; + int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size); + return ret ? 0 : ptr; +#endif + } + void operator delete(void* p) + { +#ifdef _WIN32 + _aligned_free(p); +#else + free(p); +#endif + } +}; + + +static thread_local worker_data* tls_worker_data; + +class thread_pool_generic : public thread_pool +{ + /** Cache for per-worker structures */ + cache<worker_data> m_thread_data_cache; + + /** The task queue */ + circular_queue<task*> m_task_queue; + + /** List of standby (idle) workers */ + doubly_linked_list<worker_data> m_standby_threads; + + /** List of threads that are executing tasks */ + doubly_linked_list<worker_data> m_active_threads; + + /* Mutex that protects the whole struct, most importantly + the standby threads list, and task queue */ + std::mutex m_mtx; + + /** Timeout after which idle worker shuts down */ + std::chrono::milliseconds m_thread_timeout; + + /** How often should timer wakeup.*/ + std::chrono::milliseconds m_timer_interval; + + /** Another condition variable, used in pool shutdown */ + std::condition_variable m_cv_no_threads; + + /** Condition variable for the timer thread. Signaled on shutdown. */ + std::condition_variable m_cv_timer; + + /** Overall number of enqueues*/ + unsigned long long m_tasks_enqueued; + unsigned long long m_group_enqueued; + /** Overall number of dequeued tasks. */ + unsigned long long m_tasks_dequeued; + + /** Statistic related, number of worker thread wakeups */ + int m_wakeups; + + /** + Statistic related, number of spurious thread wakeups + (i.e thread woke up, and the task queue is empty) + */ + int m_spurious_wakeups; + + /** The desired concurrency. This number of workers should be + actively executing. */ + unsigned int m_concurrency; + + /** True, if threadpool is being shutdown, false otherwise */ + bool m_in_shutdown; + + /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/ + enum class timer_state_t + { + OFF, ON + }; + timer_state_t m_timer_state= timer_state_t::OFF; + void switch_timer(timer_state_t state); + + /* Updates idle_since, and maybe switches the timer off */ + void check_idle(std::chrono::system_clock::time_point now); + + /** time point when timer last ran, used as a coarse clock. */ + std::chrono::system_clock::time_point m_timestamp; + + /** Number of long running tasks. The long running tasks are excluded when + adjusting concurrency */ + unsigned int m_long_tasks_count; + + unsigned int m_waiting_task_count; + + /** Last time thread was created*/ + std::chrono::system_clock::time_point m_last_thread_creation; + + /** Minimumum number of threads in this pool.*/ + unsigned int m_min_threads; + + /** Maximimum number of threads in this pool. */ + unsigned int m_max_threads; + + /* maintenance related statistics (see maintenance()) */ + size_t m_last_thread_count; + unsigned long long m_last_activity; + + void worker_main(worker_data *thread_data); + void worker_end(worker_data* thread_data); + + /* Checks threadpool responsiveness, adjusts thread_counts */ + void maintenance(); + static void maintenance_func(void* arg) + { + ((thread_pool_generic *)arg)->maintenance(); + } + bool add_thread(); + bool wake(worker_wake_reason reason, task *t = nullptr); + void maybe_wake_or_create_thread(); + bool too_many_active_threads(); + bool get_task(worker_data *thread_var, task **t); + bool wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_var); + void cancel_pending(task* t); + + size_t thread_count() + { + return m_active_threads.size() + m_standby_threads.size(); + } +public: + thread_pool_generic(int min_threads, int max_threads); + ~thread_pool_generic(); + void wait_begin() override; + void wait_end() override; + void submit_task(task *task) override; + virtual aio *create_native_aio(int max_io) override + { +#ifdef _WIN32 + return create_win_aio(this, max_io); +#elif defined(__linux__) + return create_linux_aio(this,max_io); +#else + return nullptr; +#endif + } + + class timer_generic : public thr_timer_t, public timer + { + thread_pool_generic* m_pool; + waitable_task m_task; + callback_func m_callback; + void* m_data; + int m_period; + std::mutex m_mtx; + bool m_on; + std::atomic<bool> m_running; + + void run() + { + /* + In rare cases, multiple callbacks can be scheduled, + e.g with set_time(0,0) in a loop. + We do not allow parallel execution, as user is not prepared. + */ + bool expected = false; + if (!m_running.compare_exchange_strong(expected, true)) + return; + + m_callback(m_data); + dbug_execute_after_task_callback(); + m_running = false; + + if (m_pool && m_period) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (m_on) + { + DBUG_PUSH_EMPTY; + thr_timer_end(this); + thr_timer_settime(this, 1000ULL * m_period); + DBUG_POP_EMPTY; + } + } + } + + static void execute(void* arg) + { + auto timer = (timer_generic*)arg; + timer->run(); + } + + static void submit_task(void* arg) + { + timer_generic* timer = (timer_generic*)arg; + timer->m_pool->submit_task(&timer->m_task); + } + + public: + timer_generic(callback_func func, void* data, thread_pool_generic * pool): + m_pool(pool), + m_task(timer_generic::execute,this), + m_callback(func),m_data(data),m_period(0),m_mtx(), + m_on(true),m_running() + { + if (pool) + { + /* EXecute callback in threadpool*/ + thr_timer_init(this, submit_task, this); + } + else + { + /* run in "timer" thread */ + thr_timer_init(this, m_task.get_func(), m_task.get_arg()); + } + } + + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!m_on) + return; + thr_timer_end(this); + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + thr_timer_settime(this, 1000ULL * initial_delay_ms); + } + + /* + Change only period of a periodic timer + (after the next execution). Workarounds + mysys timer deadlocks + */ + void set_period(int period_ms) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (!m_on) + return; + if (!m_pool) + thr_timer_set_period(this, 1000ULL * period_ms); + else + m_period = period_ms; + } + + void disarm() override + { + std::unique_lock<std::mutex> lk(m_mtx); + m_on = false; + thr_timer_end(this); + lk.unlock(); + + if (m_task.m_group) + { + m_task.m_group->cancel_pending(&m_task); + } + if (m_pool) + { + m_pool->cancel_pending(&m_task); + } + m_task.wait(); + } + + virtual ~timer_generic() + { + disarm(); + } + }; + timer_generic m_maintenance_timer; + virtual timer* create_timer(callback_func func, void *data) override + { + return new timer_generic(func, data, this); + } +}; + +void thread_pool_generic::cancel_pending(task* t) +{ + std::unique_lock <std::mutex> lk(m_mtx); + for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++) + { + if (*it == t) + { + t->release(); + *it = nullptr; + } + } +} +/** + Register worker in standby list, and wait to be woken. + + @retval true if thread was woken + @retval false idle wait timeout exceeded (the current thread must shutdown) +*/ +bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk, + worker_data *thread_data) +{ + assert(m_task_queue.empty()); + assert(!m_in_shutdown); + + thread_data->m_wake_reason= WAKE_REASON_NONE; + m_active_threads.erase(thread_data); + m_standby_threads.push_back(thread_data); + + for (;;) + { + thread_data->m_cv.wait_for(lk, m_thread_timeout); + + if (thread_data->m_wake_reason != WAKE_REASON_NONE) + { + /* Woke up not due to timeout.*/ + return true; + } + + if (thread_count() <= m_min_threads) + { + /* Do not shutdown thread, maintain required minimum of worker + threads.*/ + continue; + } + + /* + Woke up due to timeout, remove this thread's from the standby list. In + all other cases where it is signaled it is removed by the signaling + thread. + */ + m_standby_threads.erase(thread_data); + m_active_threads.push_back(thread_data); + return false; + } +} + + +/** + Workers "get next task" routine. + + A task can be handed over to the current thread directly during submit(). + if thread_var->m_wake_reason == WAKE_REASON_TASK. + + Or a task can be taken from the task queue. + In case task queue is empty, the worker thread will park (wait for wakeup). +*/ +bool thread_pool_generic::get_task(worker_data *thread_var, task **t) +{ + std::unique_lock<std::mutex> lk(m_mtx); + + if (thread_var->is_long_task()) + { + DBUG_ASSERT(m_long_tasks_count); + m_long_tasks_count--; + } + DBUG_ASSERT(!thread_var->is_waiting()); + thread_var->m_state = worker_data::NONE; + + while (m_task_queue.empty()) + { + if (m_in_shutdown) + return false; + + if (!wait_for_tasks(lk, thread_var)) + return false; + if (m_task_queue.empty()) + { + m_spurious_wakeups++; + continue; + } + } + + /* Dequeue from the task queue.*/ + *t= m_task_queue.front(); + m_task_queue.pop(); + m_tasks_dequeued++; + thread_var->m_state |= worker_data::EXECUTING_TASK; + thread_var->m_task_start_time = m_timestamp; + return true; +} + +/** Worker thread shutdown routine. */ +void thread_pool_generic::worker_end(worker_data* thread_data) +{ + std::lock_guard<std::mutex> lk(m_mtx); + DBUG_ASSERT(!thread_data->is_long_task()); + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + + if (!thread_count() && m_in_shutdown) + { + /* Signal the destructor that no more threads are left. */ + m_cv_no_threads.notify_all(); + } +} + +extern "C" void set_tls_pool(tpool::thread_pool* pool); + +/* The worker get/execute task loop.*/ +void thread_pool_generic::worker_main(worker_data *thread_var) +{ + task* task; + set_tls_pool(this); + if(m_worker_init_callback) + m_worker_init_callback(); + + tls_worker_data = thread_var; + + while (get_task(thread_var, &task) && task) + { + task->execute(); + } + + if (m_worker_destroy_callback) + m_worker_destroy_callback(); + + worker_end(thread_var); +} + + +/* + Check if threadpool had been idle for a while + Switch off maintenance timer if it is in idle state + for too long. + + Helper function, to be used inside maintenance callback, + before m_last_activity is updated +*/ + +static const auto invalid_timestamp= std::chrono::system_clock::time_point::max(); +constexpr auto max_idle_time= std::chrono::minutes(1); + +/* Time since maintenance timer had nothing to do */ +static std::chrono::system_clock::time_point idle_since= invalid_timestamp; +void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now) +{ + DBUG_ASSERT(m_task_queue.empty()); + + /* + We think that there is no activity, if there were at most 2 tasks + since last time, and there is a spare thread. + The 2 tasks (and not 0) is to account for some periodic timers. + */ + bool idle= m_standby_threads.m_count > 0; + + if (!idle) + { + idle_since= invalid_timestamp; + return; + } + + if (idle_since == invalid_timestamp) + { + idle_since= now; + return; + } + + /* Switch timer off after 1 minute of idle time */ + if (now - idle_since > max_idle_time) + { + idle_since= invalid_timestamp; + switch_timer(timer_state_t::OFF); + } +} + + +/* + Periodic job to fix thread count and concurrency, + in case of long tasks, etc +*/ +void thread_pool_generic::maintenance() +{ + /* + If pool is busy (i.e the its mutex is currently locked), we can + skip the maintenance task, some times, to lower mutex contention + */ + static int skip_counter; + const int MAX_SKIPS = 10; + std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock); + if (skip_counter == MAX_SKIPS) + { + lk.lock(); + } + else if (!lk.try_lock()) + { + skip_counter++; + return; + } + + skip_counter = 0; + + m_timestamp = std::chrono::system_clock::now(); + + if (m_task_queue.empty()) + { + check_idle(m_timestamp); + m_last_activity = m_tasks_dequeued + m_wakeups; + return; + } + + m_long_tasks_count = 0; + for (auto thread_data = m_active_threads.front(); + thread_data; + thread_data = thread_data->m_next) + { + if (thread_data->is_executing_task() && + !thread_data->is_waiting() && + (thread_data->is_long_task() + || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION))) + { + thread_data->m_state |= worker_data::LONG_TASK; + m_long_tasks_count++; + } + } + + maybe_wake_or_create_thread(); + + size_t thread_cnt = (int)thread_count(); + if (m_last_activity == m_tasks_dequeued + m_wakeups && + m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt) + { + // no progress made since last iteration. create new + // thread + add_thread(); + } + m_last_activity = m_tasks_dequeued + m_wakeups; + m_last_thread_count= thread_cnt; +} + +/* + Heuristic used for thread creation throttling. + Returns interval in milliseconds between thread creation + (depending on number of threads already in the pool, and + desired concurrency level) +*/ +static int throttling_interval_ms(size_t n_threads,size_t concurrency) +{ + if (n_threads < concurrency*4) + return 0; + + if (n_threads < concurrency*8) + return 50; + + if (n_threads < concurrency*16) + return 100; + + return 200; +} + +/* Create a new worker.*/ +bool thread_pool_generic::add_thread() +{ + size_t n_threads = thread_count(); + + if (n_threads >= m_max_threads) + return false; + + if (n_threads >= m_min_threads) + { + auto now = std::chrono::system_clock::now(); + if (now - m_last_thread_creation < + std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency))) + { + /* + Throttle thread creation and wakeup deadlock detection timer, + if is it off. + */ + switch_timer(timer_state_t::ON); + + return false; + } + } + + worker_data *thread_data = m_thread_data_cache.get(); + m_active_threads.push_back(thread_data); + try + { + std::thread thread(&thread_pool_generic::worker_main, this, thread_data); + m_last_thread_creation = std::chrono::system_clock::now(); + thread.detach(); + } + catch (std::system_error& e) + { + m_active_threads.erase(thread_data); + m_thread_data_cache.put(thread_data); + static bool warning_written; + if (!warning_written) + { + fprintf(stderr, "Warning : threadpool thread could not be created :%s," + "current number of threads in pool %zu\n", e.what(), thread_count()); + warning_written = true; + } + return false; + } + return true; +} + +/** Wake a standby thread, and hand the given task over to this thread. */ +bool thread_pool_generic::wake(worker_wake_reason reason, task *) +{ + assert(reason != WAKE_REASON_NONE); + + if (m_standby_threads.empty()) + return false; + auto var= m_standby_threads.back(); + m_standby_threads.pop_back(); + m_active_threads.push_back(var); + assert(var->m_wake_reason == WAKE_REASON_NONE); + var->m_wake_reason= reason; + var->m_cv.notify_one(); + m_wakeups++; + return true; +} + + +thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) : + m_thread_data_cache(max_threads), + m_task_queue(10000), + m_standby_threads(), + m_active_threads(), + m_mtx(), + m_thread_timeout(std::chrono::milliseconds(60000)), + m_timer_interval(std::chrono::milliseconds(400)), + m_cv_no_threads(), + m_cv_timer(), + m_tasks_enqueued(), + m_tasks_dequeued(), + m_wakeups(), + m_spurious_wakeups(), + m_concurrency(std::thread::hardware_concurrency()*2), + m_in_shutdown(), + m_timestamp(), + m_long_tasks_count(), + m_waiting_task_count(), + m_last_thread_creation(), + m_min_threads(min_threads), + m_max_threads(max_threads), + m_last_thread_count(), + m_last_activity(), + m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr) +{ + + if (m_max_threads < m_concurrency) + m_concurrency = m_max_threads; + if (m_min_threads > m_concurrency) + m_concurrency = min_threads; + if (!m_concurrency) + m_concurrency = 1; + + // start the timer + m_maintenance_timer.set_time(0, (int)m_timer_interval.count()); +} + + +void thread_pool_generic::maybe_wake_or_create_thread() +{ + if (m_task_queue.empty()) + return; + DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count)); + if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency) + return; + if (!m_standby_threads.empty()) + { + wake(WAKE_REASON_TASK); + } + else + { + add_thread(); + } +} + +bool thread_pool_generic::too_many_active_threads() +{ + return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > + m_concurrency* OVERSUBSCRIBE_FACTOR; +} + +/** Submit a new task*/ +void thread_pool_generic::submit_task(task* task) +{ + std::unique_lock<std::mutex> lk(m_mtx); + if (m_in_shutdown) + return; + task->add_ref(); + m_tasks_enqueued++; + m_task_queue.push(task); + maybe_wake_or_create_thread(); +} + + +/* Notify thread pool that current thread is going to wait */ +void thread_pool_generic::wait_begin() +{ + if (!tls_worker_data || tls_worker_data->is_long_task()) + return; + std::unique_lock<std::mutex> lk(m_mtx); + if(tls_worker_data->is_long_task()) + { + /* + Current task flag could have become "long-running" + while waiting for the lock, thus recheck. + */ + return; + } + DBUG_ASSERT(!tls_worker_data->is_waiting()); + tls_worker_data->m_state |= worker_data::WAITING; + m_waiting_task_count++; + + /* Maintain concurrency */ + maybe_wake_or_create_thread(); +} + + +void thread_pool_generic::wait_end() +{ + if (tls_worker_data && tls_worker_data->is_waiting()) + { + std::unique_lock<std::mutex> lk(m_mtx); + tls_worker_data->m_state &= ~worker_data::WAITING; + m_waiting_task_count--; + } +} + + +void thread_pool_generic::switch_timer(timer_state_t state) +{ + if (m_timer_state == state) + return; + /* + We can't use timer::set_time, because mysys timers are deadlock + prone. + + Instead, to switch off we increase the timer period + and decrease period to switch on. + + This might introduce delays in thread creation when needed, + as period will only be changed when timer fires next time. + For this reason, we can't use very long periods for the "off" state. + */ + m_timer_state= state; + long long period= (state == timer_state_t::OFF) ? + m_timer_interval.count()*10: m_timer_interval.count(); + + m_maintenance_timer.set_period((int)period); +} + + +/** + Wake up all workers, and wait until they are gone + Stop the timer. +*/ +thread_pool_generic::~thread_pool_generic() +{ + /* + Stop AIO early. + This is needed to prevent AIO completion thread + from calling submit_task() on an object that is being destroyed. + */ + m_aio.reset(); + + /* Also stop the maintanence task early. */ + m_maintenance_timer.disarm(); + + std::unique_lock<std::mutex> lk(m_mtx); + m_in_shutdown= true; + + /* Wake up idle threads. */ + while (wake(WAKE_REASON_SHUTDOWN)) + { + } + + while (thread_count()) + { + m_cv_no_threads.wait(lk); + } + + lk.unlock(); +} + +thread_pool *create_thread_pool_generic(int min_threads, int max_threads) +{ + return new thread_pool_generic(min_threads, max_threads); +} + +} // namespace tpool diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h new file mode 100644 index 00000000..7b0fb857 --- /dev/null +++ b/tpool/tpool_structs.h @@ -0,0 +1,357 @@ +/* Copyright(C) 2019 MariaDB Corporation + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include <vector> +#include <stack> +#include <mutex> +#include <condition_variable> +#include <assert.h> +#include <algorithm> + + +/* Suppress TSAN warnings, that we believe are not critical. */ +#if defined(__has_feature) +#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__) +#else +#define TPOOL_HAS_FEATURE(...) 0 +#endif + +#if TPOOL_HAS_FEATURE(address_sanitizer) +#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize("thread"),noinline)) +#elif defined(__GNUC__) && defined (__SANITIZE_THREAD__) +#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize_thread,noinline)) +#else +#define TPOOL_SUPPRESS_TSAN +#endif + +namespace tpool +{ + +enum cache_notification_mode +{ + NOTIFY_ONE, + NOTIFY_ALL +}; + +/** + Generic "pointer" cache of a fixed size + with fast put/get operations. + + Compared to STL containers, is faster/does not + do allocations. However, put() operation will wait + if there is no free items. +*/ +template<typename T> class cache +{ + std::mutex m_mtx; + std::condition_variable m_cv; + std::vector<T> m_base; + std::vector<T*> m_cache; + cache_notification_mode m_notification_mode; + int m_waiters; + + bool is_full() + { + return m_cache.size() == m_base.size(); + } + +public: + cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL): + m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters() + { + for(size_t i = 0 ; i < count; i++) + m_cache[i]=&m_base[i]; + } + + T* get(bool blocking=true) + { + std::unique_lock<std::mutex> lk(m_mtx); + if (blocking) + { + while(m_cache.empty()) + m_cv.wait(lk); + } + else + { + if(m_cache.empty()) + return nullptr; + } + T* ret = m_cache.back(); + m_cache.pop_back(); + return ret; + } + + + void put(T *ele) + { + std::unique_lock<std::mutex> lk(m_mtx); + m_cache.push_back(ele); + if (m_notification_mode == NOTIFY_ONE) + m_cv.notify_one(); + else if(m_cache.size() == 1) + m_cv.notify_all(); // Signal cache is not empty + else if(m_waiters && is_full()) + m_cv.notify_all(); // Signal cache is full + } + + bool contains(T* ele) + { + return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1]; + } + + /* Wait until cache is full.*/ + void wait() + { + std::unique_lock<std::mutex> lk(m_mtx); + m_waiters++; + while(!is_full()) + m_cv.wait(lk); + m_waiters--; + } + + TPOOL_SUPPRESS_TSAN size_t size() + { + return m_cache.size(); + } +}; + + +/** + Circular, fixed size queue + used for the task queue. + + Compared to STL queue, this one is + faster, and does not do memory allocations +*/ +template <typename T> class circular_queue +{ + +public: + circular_queue(size_t N = 16) + : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail() + { + } + bool empty() { return m_head == m_tail; } + bool full() { return (m_head + 1) % m_capacity == m_tail; } + void clear() { m_head = m_tail = 0; } + void resize(size_t new_size) + { + auto current_size = size(); + if (new_size <= current_size) + return; + size_t new_capacity = new_size - 1; + std::vector<T> new_buffer(new_capacity); + /* Figure out faster way to copy*/ + size_t i = 0; + while (!empty()) + { + T& ele = front(); + pop(); + new_buffer[i++] = ele; + } + m_buffer = new_buffer; + m_capacity = new_capacity; + m_tail = 0; + m_head = current_size; + } + void push(T ele) + { + if (full()) + { + assert(size() == m_capacity - 1); + resize(size() + 1024); + } + m_buffer[m_head] = ele; + m_head = (m_head + 1) % m_capacity; + } + void push_front(T ele) + { + if (full()) + { + resize(size() + 1024); + } + if (m_tail == 0) + m_tail = m_capacity - 1; + else + m_tail--; + m_buffer[m_tail] = ele; + } + T& front() + { + assert(!empty()); + return m_buffer[m_tail]; + } + void pop() + { + assert(!empty()); + m_tail = (m_tail + 1) % m_capacity; + } + size_t size() + { + if (m_head < m_tail) + { + return m_capacity - m_tail + m_head; + } + else + { + return m_head - m_tail; + } + } + + /*Iterator over elements in queue.*/ + class iterator + { + size_t m_pos; + circular_queue<T>* m_queue; + public: + explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {} + iterator& operator++() + { + m_pos= (m_pos + 1) % m_queue->m_capacity; + return *this; + } + iterator operator++(int) + { + iterator retval= *this; + ++*this; + return retval; + } + bool operator==(iterator other) const { return m_pos == other.m_pos; } + bool operator!=(iterator other) const { return !(*this == other); } + T& operator*() const { return m_queue->m_buffer[m_pos]; } + }; + + iterator begin() { return iterator(m_tail, this); } + iterator end() { return iterator(m_head, this); } +private: + size_t m_capacity; + std::vector<T> m_buffer; + size_t m_head; + size_t m_tail; +}; + +/* Doubly linked list. Intrusive, + requires element to have m_next and m_prev pointers. +*/ +template<typename T> class doubly_linked_list +{ +public: + T* m_first; + T* m_last; + size_t m_count; + doubly_linked_list():m_first(),m_last(),m_count() + {} + void check() + { + assert(!m_first || !m_first->m_prev); + assert(!m_last || !m_last->m_next); + assert((!m_first && !m_last && m_count == 0) + || (m_first != 0 && m_last != 0 && m_count > 0)); + T* current = m_first; + for(size_t i=1; i< m_count;i++) + { + current = current->m_next; + } + assert(current == m_last); + current = m_last; + for (size_t i = 1; i < m_count; i++) + { + current = current->m_prev; + } + assert(current == m_first); + } + T* front() + { + return m_first; + } + size_t size() + { + return m_count; + } + void push_back(T* ele) + { + ele->m_prev = m_last; + if (m_last) + m_last->m_next = ele; + + ele->m_next = 0; + m_last = ele; + if (!m_first) + m_first = m_last; + + m_count++; + } + T* back() + { + return m_last; + } + bool empty() + { + return m_count == 0; + } + void pop_back() + { + m_last = m_last->m_prev; + if (m_last) + m_last->m_next = 0; + else + m_first = 0; + m_count--; + } + bool contains(T* ele) + { + if (!ele) + return false; + T* current = m_first; + while(current) + { + if(current == ele) + return true; + current = current->m_next; + } + return false; + } + + void erase(T* ele) + { + assert(contains(ele)); + + if (ele == m_first) + { + m_first = ele->m_next; + if (m_first) + m_first->m_prev = 0; + else + m_last = 0; + } + else if (ele == m_last) + { + assert(ele->m_prev); + m_last = ele->m_prev; + m_last->m_next = 0; + } + else + { + assert(ele->m_next); + assert(ele->m_prev); + ele->m_next->m_prev = ele->m_prev; + ele->m_prev->m_next = ele->m_next; + } + m_count--; + } +}; + +} diff --git a/tpool/tpool_win.cc b/tpool/tpool_win.cc new file mode 100644 index 00000000..09fd49d9 --- /dev/null +++ b/tpool/tpool_win.cc @@ -0,0 +1,292 @@ +/* Copyright(C) 2019 MariaDB + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#include "tpool_structs.h" +#include <stdlib.h> +#include <tpool.h> +#include <windows.h> +#include <atomic> + +/** + Implementation of tpool/aio based on Windows native threadpool. +*/ + +namespace tpool +{ +/** + Pool, based on Windows native(Vista+) threadpool. +*/ +class thread_pool_win : public thread_pool +{ + /** + Handle per-thread init/term functions. + Since it is Windows that creates thread, and not us, + it is tricky. We employ thread local storage data + and check whether init function was called, inside every callback. + */ + struct tls_data + { + thread_pool_win *m_pool; + ~tls_data() + { + /* Call thread termination function. */ + if (!m_pool) + return; + + if (m_pool->m_worker_destroy_callback) + m_pool->m_worker_destroy_callback(); + + m_pool->m_thread_count--; + } + /** This needs to be called before every IO or simple task callback.*/ + void callback_prolog(thread_pool_win* pool) + { + assert(pool); + assert(!m_pool || (m_pool == pool)); + if (m_pool) + { + // TLS data already initialized. + return; + } + m_pool = pool; + m_pool->m_thread_count++; + // Call the thread init function. + if (m_pool->m_worker_init_callback) + m_pool->m_worker_init_callback(); + } + }; + + static thread_local struct tls_data tls_data; + /** Timer */ + class native_timer : public timer + { + std::mutex m_mtx; // protects against parallel execution + std::mutex m_shutdown_mtx; // protects m_on + PTP_TIMER m_ptp_timer; + callback_func m_func; + void *m_data; + thread_pool_win& m_pool; + int m_period; + bool m_on; + + static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context, + PTP_TIMER callback_timer) + { + native_timer *timer= (native_timer *) context; + tls_data.callback_prolog(&timer->m_pool); + std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock); + if (!lk.try_lock()) + { + /* Do not try to run timers in parallel */ + return; + } + timer->m_func(timer->m_data); + dbug_execute_after_task_callback(); + if (timer->m_period) + timer->set_time(timer->m_period, timer->m_period); + } + + public: + native_timer(thread_pool_win& pool, callback_func func, void* data) : + m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true) + { + m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env); + } + void set_time(int initial_delay_ms, int period_ms) override + { + std::unique_lock<std::mutex> lk(m_shutdown_mtx); + if (!m_on) + return; + long long initial_delay = -10000LL * initial_delay_ms; + SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0); + SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100); + m_period = period_ms; + } + void disarm() override + { + std::unique_lock<std::mutex> lk(m_shutdown_mtx); + m_on = false; + SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0); + lk.unlock(); + /* Don't do it in timer callback, that will hang*/ + WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE); + } + + ~native_timer() + { + disarm(); + CloseThreadpoolTimer(m_ptp_timer); + } + }; + /** AIO handler */ + class native_aio : public aio + { + thread_pool_win& m_pool; + + public: + native_aio(thread_pool_win &pool, int max_io) + : m_pool(pool) + { + } + + /** + Submit async IO. + */ + virtual int submit_io(aiocb* cb) override + { + memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED)); + + ULARGE_INTEGER uli; + uli.QuadPart = cb->m_offset; + cb->Offset = uli.LowPart; + cb->OffsetHigh = uli.HighPart; + cb->m_internal = this; + StartThreadpoolIo(cb->m_fh.m_ptp_io); + + BOOL ok; + if (cb->m_opcode == aio_opcode::AIO_PREAD) + ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + else + ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb); + + if (ok || (GetLastError() == ERROR_IO_PENDING)) + return 0; + + CancelThreadpoolIo(cb->m_fh.m_ptp_io); + return -1; + } + + /** + PTP_WIN32_IO_CALLBACK-typed function, required parameter for + CreateThreadpoolIo(). The user callback and other auxiliary data is put into + the extended OVERLAPPED parameter. + */ + static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, + ULONG io_result, ULONG_PTR nbytes, + PTP_IO io) + { + aiocb* cb = (aiocb*)overlapped; + native_aio* aio = (native_aio*)cb->m_internal; + tls_data.callback_prolog(&aio->m_pool); + cb->m_err = io_result; + cb->m_ret_len = (int)nbytes; + cb->m_internal_task.m_func = cb->m_callback; + cb->m_internal_task.m_group = cb->m_group; + cb->m_internal_task.m_arg = cb; + cb->m_internal_task.execute(); + } + + /** + Binds the file handle via CreateThreadpoolIo(). + */ + virtual int bind(native_file_handle& fd) override + { + fd.m_ptp_io = + CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env)); + if (fd.m_ptp_io) + return 0; + return -1; + } + + /** + Unbind the file handle via CloseThreadpoolIo. + */ + virtual int unbind(const native_file_handle& fd) override + { + if (fd.m_ptp_io) + CloseThreadpoolIo(fd.m_ptp_io); + return 0; + } + }; + + PTP_POOL m_ptp_pool; + TP_CALLBACK_ENVIRON m_env; + PTP_CLEANUP_GROUP m_cleanup; + const int TASK_CACHE_SIZE= 10000; + + struct task_cache_entry + { + thread_pool_win *m_pool; + task* m_task; + }; + cache<task_cache_entry> m_task_cache; + std::atomic<int> m_thread_count; +public: + thread_pool_win(int min_threads= 0, int max_threads= 0) + : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0) + { + InitializeThreadpoolEnvironment(&m_env); + m_ptp_pool= CreateThreadpool(NULL); + m_cleanup= CreateThreadpoolCleanupGroup(); + SetThreadpoolCallbackPool(&m_env, m_ptp_pool); + SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0); + if (min_threads) + SetThreadpoolThreadMinimum(m_ptp_pool, min_threads); + if (max_threads) + SetThreadpoolThreadMaximum(m_ptp_pool, max_threads); + } + ~thread_pool_win() + { + CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL); + CloseThreadpoolCleanupGroup(m_cleanup); + CloseThreadpool(m_ptp_pool); + + // Wait until all threads finished and TLS destructors ran. + while(m_thread_count) + Sleep(1); + } + /** + PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback() + */ + static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param) + { + auto entry= (task_cache_entry *) param; + auto task= entry->m_task; + + tls_data.callback_prolog(entry->m_pool); + + entry->m_pool->m_task_cache.put(entry); + + task->execute(); + } + virtual void submit_task(task *task) override + { + auto entry= m_task_cache.get(); + task->add_ref(); + entry->m_pool= this; + entry->m_task= task; + if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env)) + abort(); + } + + aio *create_native_aio(int max_io) override + { + return new native_aio(*this, max_io); + } + + timer* create_timer(callback_func func, void* data) override + { + return new native_timer(*this, func, data); + } +}; + +thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data; + +thread_pool *create_thread_pool_win(int min_threads, int max_threads) +{ + return new thread_pool_win(min_threads, max_threads); +} +} // namespace tpool diff --git a/tpool/wait_notification.cc b/tpool/wait_notification.cc new file mode 100644 index 00000000..7743e2db --- /dev/null +++ b/tpool/wait_notification.cc @@ -0,0 +1,25 @@ +#include <tpool.h> + +namespace tpool +{ +static thread_local tpool::thread_pool* tls_thread_pool; + +extern "C" void set_tls_pool(tpool::thread_pool* pool) +{ + tls_thread_pool = pool; +} + +extern "C" void tpool_wait_begin() +{ + if (tls_thread_pool) + tls_thread_pool->wait_begin(); +} + + +extern "C" void tpool_wait_end() +{ + if (tls_thread_pool) + tls_thread_pool->wait_end(); +} + +} |