summaryrefslogtreecommitdiffstats
path: root/tpool
diff options
context:
space:
mode:
Diffstat (limited to 'tpool')
-rw-r--r--tpool/CMakeLists.txt29
-rw-r--r--tpool/aio_linux.cc193
-rw-r--r--tpool/aio_simulated.cc186
-rw-r--r--tpool/aio_win.cc139
-rw-r--r--tpool/task.cc108
-rw-r--r--tpool/task_group.cc100
-rw-r--r--tpool/tpool.h261
-rw-r--r--tpool/tpool_generic.cc920
-rw-r--r--tpool/tpool_structs.h357
-rw-r--r--tpool/tpool_win.cc292
-rw-r--r--tpool/wait_notification.cc25
11 files changed, 2610 insertions, 0 deletions
diff --git a/tpool/CMakeLists.txt b/tpool/CMakeLists.txt
new file mode 100644
index 00000000..3e3f8e0b
--- /dev/null
+++ b/tpool/CMakeLists.txt
@@ -0,0 +1,29 @@
+INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
+IF(WIN32)
+ SET(EXTRA_SOURCES tpool_win.cc aio_win.cc)
+ELSE()
+ SET(EXTRA_SOURCES aio_linux.cc)
+ENDIF()
+
+IF(CMAKE_SYSTEM_NAME STREQUAL "Linux")
+ CHECK_INCLUDE_FILES (libaio.h HAVE_LIBAIO_H)
+ CHECK_LIBRARY_EXISTS(aio io_queue_init "" HAVE_LIBAIO)
+ IF(HAVE_LIBAIO_H AND HAVE_LIBAIO)
+ ADD_DEFINITIONS(-DLINUX_NATIVE_AIO=1)
+ LINK_LIBRARIES(aio)
+ ENDIF()
+ENDIF()
+
+ADD_LIBRARY(tpool STATIC
+ aio_simulated.cc
+ tpool_structs.h
+ CMakeLists.txt
+ tpool.h
+ tpool_generic.cc
+ task_group.cc
+ task.cc
+ wait_notification.cc
+ ${EXTRA_SOURCES}
+)
+
+INCLUDE_DIRECTORIES(${PROJECT_SOURCE_DIR}/include) \ No newline at end of file
diff --git a/tpool/aio_linux.cc b/tpool/aio_linux.cc
new file mode 100644
index 00000000..d9aa8be2
--- /dev/null
+++ b/tpool/aio_linux.cc
@@ -0,0 +1,193 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include "tpool.h"
+
+#ifdef LINUX_NATIVE_AIO
+# include <thread>
+# include <atomic>
+# include <libaio.h>
+# include <sys/syscall.h>
+
+/**
+ Invoke the io_getevents() system call, without timeout parameter.
+
+ @param ctx context from io_setup()
+ @param min_nr minimum number of completion events to wait for
+ @param nr maximum number of completion events to collect
+ @param ev the collected events
+
+ In https://pagure.io/libaio/c/7cede5af5adf01ad26155061cc476aad0804d3fc
+ the io_getevents() implementation in libaio was "optimized" so that it
+ would elide the system call when there are no outstanding requests
+ and a timeout was specified.
+
+ The libaio code for dereferencing ctx would occasionally trigger
+ SIGSEGV if io_destroy() was concurrently invoked from another thread.
+ Hence, we have to use the raw system call.
+
+ WHY are we doing this at all?
+ Because we want io_destroy() from another thread to interrupt io_getevents().
+
+ And, WHY do we want io_destroy() from another thread to interrupt
+ io_getevents()?
+
+ Because there is no documented, libaio-friendly and race-condition-free way to
+ interrupt io_getevents(). io_destroy() coupled with raw syscall seemed to work
+ for us so far.
+
+ Historical note : in the past, we used io_getevents with timeouts. We'd wake
+ up periodically, check for shutdown flag, return from the main routine.
+ This was admittedly safer, yet it did cost periodic wakeups, which we are not
+ willing to do anymore.
+
+ @note we also rely on the undocumented property, that io_destroy(ctx)
+ will make this version of io_getevents return EINVAL.
+*/
+static int my_getevents(io_context_t ctx, long min_nr, long nr, io_event *ev)
+{
+ int saved_errno= errno;
+ int ret= syscall(__NR_io_getevents, reinterpret_cast<long>(ctx),
+ min_nr, nr, ev, 0);
+ if (ret < 0)
+ {
+ ret= -errno;
+ errno= saved_errno;
+ }
+ return ret;
+}
+#endif
+
+
+/*
+ Linux AIO implementation, based on native AIO.
+ Needs libaio.h and -laio at the compile time.
+
+ io_submit() is used to submit async IO.
+
+ A single thread will collect the completion notification
+ with io_getevents() and forward io completion callback to
+ the worker threadpool.
+*/
+namespace tpool
+{
+#ifdef LINUX_NATIVE_AIO
+
+class aio_linux final : public aio
+{
+ thread_pool *m_pool;
+ io_context_t m_io_ctx;
+ std::thread m_getevent_thread;
+ static std::atomic<bool> shutdown_in_progress;
+
+ static void getevent_thread_routine(aio_linux *aio)
+ {
+ /*
+ We collect events in small batches to hopefully reduce the
+ number of system calls.
+ */
+ constexpr unsigned MAX_EVENTS= 256;
+
+ io_event events[MAX_EVENTS];
+ for (;;)
+ {
+ switch (int ret= my_getevents(aio->m_io_ctx, 1, MAX_EVENTS, events)) {
+ case -EINTR:
+ continue;
+ case -EINVAL:
+ if (shutdown_in_progress)
+ return;
+ /* fall through */
+ default:
+ if (ret < 0)
+ {
+ fprintf(stderr, "io_getevents returned %d\n", ret);
+ abort();
+ return;
+ }
+ for (int i= 0; i < ret; i++)
+ {
+ const io_event &event= events[i];
+ aiocb *iocb= static_cast<aiocb*>(event.obj);
+ if (static_cast<int>(event.res) < 0)
+ {
+ iocb->m_err= -event.res;
+ iocb->m_ret_len= 0;
+ }
+ else
+ {
+ iocb->m_ret_len= event.res;
+ iocb->m_err= 0;
+ }
+ iocb->m_internal_task.m_func= iocb->m_callback;
+ iocb->m_internal_task.m_arg= iocb;
+ iocb->m_internal_task.m_group= iocb->m_group;
+ aio->m_pool->submit_task(&iocb->m_internal_task);
+ }
+ }
+ }
+ }
+
+public:
+ aio_linux(io_context_t ctx, thread_pool *pool)
+ : m_pool(pool), m_io_ctx(ctx),
+ m_getevent_thread(getevent_thread_routine, this)
+ {
+ }
+
+ ~aio_linux()
+ {
+ shutdown_in_progress= true;
+ io_destroy(m_io_ctx);
+ m_getevent_thread.join();
+ shutdown_in_progress= false;
+ }
+
+ int submit_io(aiocb *cb) override
+ {
+ io_prep_pread(static_cast<iocb*>(cb), cb->m_fh, cb->m_buffer, cb->m_len,
+ cb->m_offset);
+ if (cb->m_opcode != aio_opcode::AIO_PREAD)
+ cb->aio_lio_opcode= IO_CMD_PWRITE;
+ iocb *icb= static_cast<iocb*>(cb);
+ int ret= io_submit(m_io_ctx, 1, &icb);
+ if (ret == 1)
+ return 0;
+ errno= -ret;
+ return -1;
+ }
+
+ int bind(native_file_handle&) override { return 0; }
+ int unbind(const native_file_handle&) override { return 0; }
+};
+
+std::atomic<bool> aio_linux::shutdown_in_progress;
+
+aio *create_linux_aio(thread_pool *pool, int max_io)
+{
+ io_context_t ctx;
+ memset(&ctx, 0, sizeof ctx);
+ if (int ret= io_setup(max_io, &ctx))
+ {
+ fprintf(stderr, "io_setup(%d) returned %d\n", max_io, ret);
+ return nullptr;
+ }
+ return new aio_linux(ctx, pool);
+}
+#else
+aio *create_linux_aio(thread_pool*, int) { return nullptr; }
+#endif
+}
diff --git a/tpool/aio_simulated.cc b/tpool/aio_simulated.cc
new file mode 100644
index 00000000..93b2ae13
--- /dev/null
+++ b/tpool/aio_simulated.cc
@@ -0,0 +1,186 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#ifndef _WIN32
+#include <unistd.h> /* pread(), pwrite() */
+#endif
+#include "tpool.h"
+#include "tpool_structs.h"
+#include <stdlib.h>
+#include <string.h>
+
+namespace tpool
+{
+#ifdef _WIN32
+
+/*
+ In order to be able to execute synchronous IO even on file opened
+ with FILE_FLAG_OVERLAPPED, and to bypass to completion port,
+ we use valid event handle for the hEvent member of the OVERLAPPED structure,
+ with its low-order bit set.
+
+ See MSDN docs for GetQueuedCompletionStatus() for description of this trick.
+*/
+static DWORD fls_sync_io= FLS_OUT_OF_INDEXES;
+HANDLE win_get_syncio_event()
+{
+ HANDLE h;
+
+ h= (HANDLE) FlsGetValue(fls_sync_io);
+ if (h)
+ {
+ return h;
+ }
+ h= CreateEventA(NULL, FALSE, FALSE, NULL);
+ /* Set low-order bit to keeps I/O completion from being queued */
+ h= (HANDLE)((uintptr_t) h | 1);
+ FlsSetValue(fls_sync_io, h);
+ return h;
+}
+#include <WinIoCtl.h>
+static void __stdcall win_free_syncio_event(void *data)
+{
+ if (data)
+ {
+ CloseHandle((HANDLE) data);
+ }
+}
+
+struct WinIoInit
+{
+ WinIoInit()
+ {
+ fls_sync_io= FlsAlloc(win_free_syncio_event);
+ if(fls_sync_io == FLS_OUT_OF_INDEXES)
+ abort();
+ }
+ ~WinIoInit() { FlsFree(fls_sync_io); }
+};
+
+static WinIoInit win_io_init;
+
+
+int pread(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (ReadFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+
+ return -1;
+}
+
+int pwrite(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset)
+{
+ OVERLAPPED ov{};
+ ULARGE_INTEGER uli;
+ uli.QuadPart= offset;
+ ov.Offset= uli.LowPart;
+ ov.OffsetHigh= uli.HighPart;
+ ov.hEvent= win_get_syncio_event();
+
+ if (WriteFile(h, buf, (DWORD) count, 0, &ov) ||
+ (GetLastError() == ERROR_IO_PENDING))
+ {
+ DWORD n_bytes;
+ if (GetOverlappedResult(h, &ov, &n_bytes, TRUE))
+ return n_bytes;
+ }
+ return -1;
+}
+#endif
+
+/**
+ Simulated AIO.
+
+ Executes IO synchronously in worker pool
+ and then calls the completion routine.
+*/
+class simulated_aio : public aio
+{
+ thread_pool *m_pool;
+
+public:
+ simulated_aio(thread_pool *tp)
+ : m_pool(tp)
+ {
+ }
+
+ static void simulated_aio_callback(void *param)
+ {
+ aiocb *cb= (aiocb *) param;
+#ifdef _WIN32
+ size_t ret_len;
+#else
+ ssize_t ret_len;
+#endif
+ int err= 0;
+ switch (cb->m_opcode)
+ {
+ case aio_opcode::AIO_PREAD:
+ ret_len= pread(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ case aio_opcode::AIO_PWRITE:
+ ret_len= pwrite(cb->m_fh, cb->m_buffer, cb->m_len, cb->m_offset);
+ break;
+ default:
+ abort();
+ }
+#ifdef _WIN32
+ if (static_cast<int>(ret_len) < 0)
+ err= GetLastError();
+#else
+ if (ret_len < 0)
+ err= errno;
+#endif
+ cb->m_ret_len = ret_len;
+ cb->m_err = err;
+ cb->m_internal_task.m_func= cb->m_callback;
+ thread_pool *pool= (thread_pool *)cb->m_internal;
+ pool->submit_task(&cb->m_internal_task);
+ }
+
+ virtual int submit_io(aiocb *aiocb) override
+ {
+ aiocb->m_internal_task.m_func = simulated_aio_callback;
+ aiocb->m_internal_task.m_arg = aiocb;
+ aiocb->m_internal_task.m_group = aiocb->m_group;
+ aiocb->m_internal = m_pool;
+ m_pool->submit_task(&aiocb->m_internal_task);
+ return 0;
+ }
+
+ virtual int bind(native_file_handle &fd) override { return 0; }
+ virtual int unbind(const native_file_handle &fd) override { return 0; }
+};
+
+aio *create_simulated_aio(thread_pool *tp)
+{
+ return new simulated_aio(tp);
+}
+
+} // namespace tpool
diff --git a/tpool/aio_win.cc b/tpool/aio_win.cc
new file mode 100644
index 00000000..b44f705b
--- /dev/null
+++ b/tpool/aio_win.cc
@@ -0,0 +1,139 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <algorithm>
+#include <assert.h>
+#include <condition_variable>
+#include <iostream>
+#include <limits.h>
+#include <mutex>
+#include <queue>
+#include <stack>
+#include <thread>
+#include <vector>
+#include <tpool.h>
+
+namespace tpool
+{
+
+/*
+ Windows AIO implementation, completion port based.
+ A single thread collects the completion notification with
+ GetQueuedCompletionStatus(), and forwards io completion callback
+ the worker threadpool
+*/
+class tpool_generic_win_aio : public aio
+{
+ /* Thread that does collects completion status from the completion port. */
+ std::thread m_thread;
+
+ /* IOCP Completion port.*/
+ HANDLE m_completion_port;
+
+ /* The worker pool where completion routine is executed, as task. */
+ thread_pool* m_pool;
+public:
+ tpool_generic_win_aio(thread_pool* pool, int max_io) : m_pool(pool)
+ {
+ m_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
+ m_thread = std::thread(aio_completion_thread_proc, this);
+ }
+
+ /**
+ Task to be executed in the work pool.
+ */
+ static void io_completion_task(void* data)
+ {
+ auto cb = (aiocb*)data;
+ cb->execute_callback();
+ }
+
+ void completion_thread_work()
+ {
+ for (;;)
+ {
+ DWORD n_bytes;
+ aiocb* aiocb;
+ ULONG_PTR key;
+ if (!GetQueuedCompletionStatus(m_completion_port, &n_bytes, &key,
+ (LPOVERLAPPED*)& aiocb, INFINITE))
+ break;
+
+ aiocb->m_err = 0;
+ aiocb->m_ret_len = n_bytes;
+
+ if (n_bytes != aiocb->m_len)
+ {
+ if (GetOverlappedResult(aiocb->m_fh, aiocb,
+ (LPDWORD)& aiocb->m_ret_len, FALSE))
+ {
+ aiocb->m_err = GetLastError();
+ }
+ }
+ aiocb->m_internal_task.m_func = aiocb->m_callback;
+ aiocb->m_internal_task.m_arg = aiocb;
+ aiocb->m_internal_task.m_group = aiocb->m_group;
+ m_pool->submit_task(&aiocb->m_internal_task);
+ }
+ }
+
+ static void aio_completion_thread_proc(tpool_generic_win_aio* aio)
+ {
+ aio->completion_thread_work();
+ }
+
+ ~tpool_generic_win_aio()
+ {
+ if (m_completion_port)
+ CloseHandle(m_completion_port);
+ m_thread.join();
+ }
+
+ virtual int submit_io(aiocb* cb) override
+ {
+ memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
+ cb->m_internal = this;
+ ULARGE_INTEGER uli;
+ uli.QuadPart = cb->m_offset;
+ cb->Offset = uli.LowPart;
+ cb->OffsetHigh = uli.HighPart;
+
+ BOOL ok;
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+ else
+ ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+
+ if (ok || (GetLastError() == ERROR_IO_PENDING))
+ return 0;
+ return -1;
+ }
+
+ // Inherited via aio
+ virtual int bind(native_file_handle& fd) override
+ {
+ return CreateIoCompletionPort(fd, m_completion_port, 0, 0) ? 0
+ : GetLastError();
+ }
+ virtual int unbind(const native_file_handle& fd) override { return 0; }
+};
+
+aio* create_win_aio(thread_pool* pool, int max_io)
+{
+ return new tpool_generic_win_aio(pool, max_io);
+}
+
+} // namespace tpool
diff --git a/tpool/task.cc b/tpool/task.cc
new file mode 100644
index 00000000..0b5253bc
--- /dev/null
+++ b/tpool/task.cc
@@ -0,0 +1,108 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+
+namespace tpool
+{
+
+#ifndef DBUG_OFF
+static callback_func_np after_task_callback;
+void set_after_task_callback(callback_func_np cb)
+{
+ after_task_callback= cb;
+}
+
+void execute_after_task_callback()
+{
+ if (after_task_callback)
+ after_task_callback();
+}
+#endif
+
+ task::task(callback_func func, void* arg, task_group* group) :
+ m_func(func), m_arg(arg), m_group(group) {}
+
+ void task::execute()
+ {
+ if (m_group)
+ {
+ /* Executing in a group (limiting concurrency).*/
+ m_group->execute(this);
+ }
+ else
+ {
+ /* Execute directly. */
+ m_func(m_arg);
+ dbug_execute_after_task_callback();
+ release();
+ }
+ }
+
+ /* Task that provide wait() operation. */
+ waitable_task::waitable_task(callback_func func, void* arg, task_group* group) :
+ task(func,arg, group),m_mtx(),m_cv(),m_ref_count(),m_waiter_count(),m_original_func(){}
+
+ void waitable_task::add_ref()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count++;
+ }
+
+ void waitable_task::release()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_ref_count--;
+ if (!m_ref_count && m_waiter_count)
+ m_cv.notify_all();
+ }
+ void waitable_task::wait(std::unique_lock<std::mutex>& lk)
+ {
+ m_waiter_count++;
+ while (m_ref_count)
+ m_cv.wait(lk);
+ m_waiter_count--;
+ }
+ void waitable_task::wait()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ wait(lk);
+ }
+
+ static void noop(void*)
+ {
+ }
+ void waitable_task::disable()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_func == noop)
+ return;
+ wait(lk);
+ m_original_func = m_func;
+ m_func = noop;
+ }
+ void waitable_task::enable()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if(m_func != noop)
+ return;
+ wait(lk);
+ m_func = m_original_func;
+ }
+}
diff --git a/tpool/task_group.cc b/tpool/task_group.cc
new file mode 100644
index 00000000..97fbb091
--- /dev/null
+++ b/tpool/task_group.cc
@@ -0,0 +1,100 @@
+/* Copyright(C) 2019 MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include <tpool.h>
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+#include <tpool_structs.h>
+#include <thread>
+#include <assert.h>
+#ifndef _WIN32
+#include <unistd.h> // usleep
+#endif
+namespace tpool
+{
+ task_group::task_group(unsigned int max_concurrency) :
+ m_queue(8),
+ m_mtx(),
+ m_tasks_running(),
+ m_max_concurrent_tasks(max_concurrency)
+ {};
+
+ void task_group::set_max_tasks(unsigned int max_concurrency)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_max_concurrent_tasks = max_concurrency;
+ }
+ void task_group::execute(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_tasks_running == m_max_concurrent_tasks)
+ {
+ /* Queue for later execution by another thread.*/
+ m_queue.push(t);
+ return;
+ }
+ m_tasks_running++;
+ for (;;)
+ {
+ lk.unlock();
+ if (t)
+ {
+ t->m_func(t->m_arg);
+ dbug_execute_after_task_callback();
+ t->release();
+ }
+ lk.lock();
+
+ if (m_queue.empty())
+ break;
+ t = m_queue.front();
+ m_queue.pop();
+ }
+ m_tasks_running--;
+ }
+
+ void task_group::cancel_pending(task* t)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!t)
+ m_queue.clear();
+ for (auto it = m_queue.begin(); it != m_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ (*it)->release();
+ (*it) = nullptr;
+ }
+ }
+ }
+
+ task_group::~task_group()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ assert(m_queue.empty());
+
+ while (m_tasks_running)
+ {
+ lk.unlock();
+#ifndef _WIN32
+ usleep(1000);
+#else
+ Sleep(1);
+#endif
+ lk.lock();
+ }
+ }
+}
diff --git a/tpool/tpool.h b/tpool/tpool.h
new file mode 100644
index 00000000..3a5658c0
--- /dev/null
+++ b/tpool/tpool.h
@@ -0,0 +1,261 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#pragma once
+#include <memory> /* unique_ptr */
+#include <condition_variable>
+#include <mutex>
+#include <atomic>
+#include <tpool_structs.h>
+#ifdef LINUX_NATIVE_AIO
+#include <libaio.h>
+#endif
+#ifdef _WIN32
+#ifndef NOMINMAX
+#define NOMINMAX
+#endif
+#include <windows.h>
+/**
+ Windows-specific native file handle struct.
+ Apart from the actual handle, contains PTP_IO
+ used by the Windows threadpool.
+*/
+struct native_file_handle
+{
+ HANDLE m_handle;
+ PTP_IO m_ptp_io;
+ native_file_handle(){};
+ native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {}
+ operator HANDLE() const { return m_handle; }
+};
+#else
+#include <unistd.h>
+typedef int native_file_handle;
+#endif
+
+namespace tpool
+{
+/**
+ Task callback function
+ */
+typedef void (*callback_func)(void *);
+typedef void (*callback_func_np)(void);
+class task;
+
+/** A class that can be used e.g. for
+restricting concurrency for specific class of tasks. */
+
+class task_group
+{
+private:
+ circular_queue<task*> m_queue;
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ unsigned int m_tasks_running;
+ unsigned int m_max_concurrent_tasks;
+public:
+ task_group(unsigned int max_concurrency= 100000);
+ void set_max_tasks(unsigned int max_concurrent_tasks);
+ void execute(task* t);
+ void cancel_pending(task *t);
+ ~task_group();
+};
+
+
+class task
+{
+public:
+ callback_func m_func;
+ void *m_arg;
+ task_group* m_group;
+ virtual void add_ref() {};
+ virtual void release() {};
+ task() {};
+ task(callback_func func, void* arg, task_group* group = nullptr);
+ void* get_arg() { return m_arg; }
+ callback_func get_func() { return m_func; }
+ virtual void execute();
+ virtual ~task() {}
+};
+
+class waitable_task :public task
+{
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ int m_ref_count;
+ int m_waiter_count;
+ callback_func m_original_func;
+ void wait(std::unique_lock<std::mutex>&lk);
+public:
+ waitable_task(callback_func func, void* arg, task_group* group = nullptr);
+ void add_ref() override;
+ void release() override;
+ TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; }
+ TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;}
+ void wait();
+ void disable();
+ void enable();
+ virtual ~waitable_task() {};
+};
+enum class aio_opcode
+{
+ AIO_PREAD,
+ AIO_PWRITE
+};
+constexpr size_t MAX_AIO_USERDATA_LEN= 3 * sizeof(void*);
+
+/** IO control block, includes parameters for the IO, and the callback*/
+
+struct aiocb
+#ifdef _WIN32
+ :OVERLAPPED
+#elif defined LINUX_NATIVE_AIO
+ :iocb
+#endif
+{
+ native_file_handle m_fh;
+ aio_opcode m_opcode;
+ unsigned long long m_offset;
+ void *m_buffer;
+ unsigned int m_len;
+ callback_func m_callback;
+ task_group* m_group;
+ /* Returned length and error code*/
+ size_t m_ret_len;
+ int m_err;
+ void *m_internal;
+ task m_internal_task;
+ alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN];
+
+ aiocb() : m_internal_task(nullptr, nullptr)
+ {}
+ void execute_callback()
+ {
+ task t(m_callback, this,m_group);
+ t.execute();
+ }
+};
+
+
+/**
+ AIO interface
+*/
+class aio
+{
+public:
+ /**
+ Submit asyncronous IO.
+ On completion, cb->m_callback is executed.
+ */
+ virtual int submit_io(aiocb *cb)= 0;
+ /** "Bind" file to AIO handler (used on Windows only) */
+ virtual int bind(native_file_handle &fd)= 0;
+ /** "Unind" file to AIO handler (used on Windows only) */
+ virtual int unbind(const native_file_handle &fd)= 0;
+ virtual ~aio(){};
+};
+
+class timer
+{
+public:
+ virtual void set_time(int initial_delay_ms, int period_ms) = 0;
+ virtual void disarm() = 0;
+ virtual ~timer(){}
+};
+
+class thread_pool;
+
+extern aio *create_simulated_aio(thread_pool *tp);
+
+#ifndef DBUG_OFF
+/*
+ This function is useful for debugging to make sure all mutexes are released
+ inside a task callback
+*/
+void set_after_task_callback(callback_func_np cb);
+void execute_after_task_callback();
+#define dbug_execute_after_task_callback() execute_after_task_callback()
+#else
+#define dbug_execute_after_task_callback() do{}while(0)
+#endif
+
+class thread_pool
+{
+protected:
+ /* AIO handler */
+ std::unique_ptr<aio> m_aio;
+ virtual aio *create_native_aio(int max_io)= 0;
+
+ /**
+ Functions to be called at worker thread start/end
+ can be used for example to set some TLS variables
+ */
+ void (*m_worker_init_callback)(void);
+ void (*m_worker_destroy_callback)(void);
+
+public:
+ thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback()
+ {
+ }
+ virtual void submit_task(task *t)= 0;
+ virtual timer* create_timer(callback_func func, void *data=nullptr) = 0;
+ void set_thread_callbacks(void (*init)(), void (*destroy)())
+ {
+ m_worker_init_callback= init;
+ m_worker_destroy_callback= destroy;
+ }
+ int configure_aio(bool use_native_aio, int max_io)
+ {
+ if (use_native_aio)
+ m_aio.reset(create_native_aio(max_io));
+ else
+ m_aio.reset(create_simulated_aio(this));
+ return !m_aio ? -1 : 0;
+ }
+ void disable_aio()
+ {
+ m_aio.reset();
+ }
+ int bind(native_file_handle &fd) { return m_aio->bind(fd); }
+ void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); }
+ int submit_io(aiocb *cb) { return m_aio->submit_io(cb); }
+ virtual void wait_begin() {};
+ virtual void wait_end() {};
+ virtual ~thread_pool() {}
+};
+const int DEFAULT_MIN_POOL_THREADS= 1;
+const int DEFAULT_MAX_POOL_THREADS= 500;
+extern thread_pool *
+create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS,
+ int max_threads= DEFAULT_MAX_POOL_THREADS);
+extern "C" void tpool_wait_begin();
+extern "C" void tpool_wait_end();
+#ifdef _WIN32
+extern thread_pool *
+create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS,
+ int max_threads= DEFAULT_MAX_POOL_THREADS);
+
+/*
+ Helper functions, to execute pread/pwrite even if file is
+ opened with FILE_FLAG_OVERLAPPED, and bound to completion
+ port.
+*/
+int pwrite(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset);
+int pread(const native_file_handle &h, void *buf, size_t count,
+ unsigned long long offset);
+HANDLE win_get_syncio_event();
+#endif
+} // namespace tpool
diff --git a/tpool/tpool_generic.cc b/tpool/tpool_generic.cc
new file mode 100644
index 00000000..7c645b09
--- /dev/null
+++ b/tpool/tpool_generic.cc
@@ -0,0 +1,920 @@
+/* Copyright (C) 2019, 2020, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <limits.h>
+#include <algorithm>
+#include <assert.h>
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <iostream>
+#include <limits.h>
+#include <mutex>
+#include <queue>
+#include <stack>
+#include <thread>
+#include <vector>
+#include "tpool.h"
+#include <assert.h>
+#include <my_global.h>
+#include <my_dbug.h>
+#include <thr_timer.h>
+#include <stdlib.h>
+
+namespace tpool
+{
+
+#ifdef __linux__
+ extern aio* create_linux_aio(thread_pool* tp, int max_io);
+#endif
+#ifdef _WIN32
+ extern aio* create_win_aio(thread_pool* tp, int max_io);
+#endif
+
+ static const std::chrono::milliseconds LONG_TASK_DURATION = std::chrono::milliseconds(500);
+ static const int OVERSUBSCRIBE_FACTOR = 2;
+
+/**
+ Implementation of generic threadpool.
+ This threadpool consists of the following components
+
+ - The task queue. This queue is populated by submit()
+ - Worker that execute the work items.
+ - Timer thread that takes care of pool health
+
+ The task queue is populated by submit() method.
+ on submit(), a worker thread can be woken, or created
+ to execute tasks.
+
+ The timer thread watches if work items are being dequeued, and if not,
+ this can indicate potential deadlock.
+ Thus the timer thread can also wake or create a thread, to ensure some progress.
+
+ Optimizations:
+
+ - worker threads that are idle for long time will shutdown.
+ - worker threads are woken in LIFO order, which minimizes context switching
+ and also ensures that idle timeout works well. LIFO wakeup order ensures
+ that active threads stay active, and idle ones stay idle.
+
+*/
+
+/**
+ Worker wakeup flags.
+*/
+enum worker_wake_reason
+{
+ WAKE_REASON_NONE,
+ WAKE_REASON_TASK,
+ WAKE_REASON_SHUTDOWN
+};
+
+
+
+/* A per-worker thread structure.*/
+struct MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) worker_data
+{
+ /** Condition variable to wakeup this worker.*/
+ std::condition_variable m_cv;
+
+ /** Reason why worker was woken. */
+ worker_wake_reason m_wake_reason;
+
+ /**
+ If worker wakes up with WAKE_REASON_TASK, this the task it needs to execute.
+ */
+ task* m_task;
+
+ /** Struct is member of intrusive doubly linked list */
+ worker_data* m_prev;
+ worker_data* m_next;
+
+ /* Current state of the worker.*/
+ enum state
+ {
+ NONE = 0,
+ EXECUTING_TASK = 1,
+ LONG_TASK = 2,
+ WAITING = 4
+ };
+
+ int m_state;
+
+ bool is_executing_task()
+ {
+ return m_state & EXECUTING_TASK;
+ }
+ bool is_long_task()
+ {
+ return m_state & LONG_TASK;
+ }
+ bool is_waiting()
+ {
+ return m_state & WAITING;
+ }
+ std::chrono::system_clock::time_point m_task_start_time;
+ worker_data() :
+ m_cv(),
+ m_wake_reason(WAKE_REASON_NONE),
+ m_task(),
+ m_prev(),
+ m_next(),
+ m_state(NONE),
+ m_task_start_time()
+ {}
+
+ /*Define custom new/delete because of overaligned structure. */
+ void* operator new(size_t size)
+ {
+#ifdef _WIN32
+ return _aligned_malloc(size, CPU_LEVEL1_DCACHE_LINESIZE);
+#else
+ void* ptr;
+ int ret = posix_memalign(&ptr, CPU_LEVEL1_DCACHE_LINESIZE, size);
+ return ret ? 0 : ptr;
+#endif
+ }
+ void operator delete(void* p)
+ {
+#ifdef _WIN32
+ _aligned_free(p);
+#else
+ free(p);
+#endif
+ }
+};
+
+
+static thread_local worker_data* tls_worker_data;
+
+class thread_pool_generic : public thread_pool
+{
+ /** Cache for per-worker structures */
+ cache<worker_data> m_thread_data_cache;
+
+ /** The task queue */
+ circular_queue<task*> m_task_queue;
+
+ /** List of standby (idle) workers */
+ doubly_linked_list<worker_data> m_standby_threads;
+
+ /** List of threads that are executing tasks */
+ doubly_linked_list<worker_data> m_active_threads;
+
+ /* Mutex that protects the whole struct, most importantly
+ the standby threads list, and task queue */
+ std::mutex m_mtx;
+
+ /** Timeout after which idle worker shuts down */
+ std::chrono::milliseconds m_thread_timeout;
+
+ /** How often should timer wakeup.*/
+ std::chrono::milliseconds m_timer_interval;
+
+ /** Another condition variable, used in pool shutdown */
+ std::condition_variable m_cv_no_threads;
+
+ /** Condition variable for the timer thread. Signaled on shutdown. */
+ std::condition_variable m_cv_timer;
+
+ /** Overall number of enqueues*/
+ unsigned long long m_tasks_enqueued;
+ unsigned long long m_group_enqueued;
+ /** Overall number of dequeued tasks. */
+ unsigned long long m_tasks_dequeued;
+
+ /** Statistic related, number of worker thread wakeups */
+ int m_wakeups;
+
+ /**
+ Statistic related, number of spurious thread wakeups
+ (i.e thread woke up, and the task queue is empty)
+ */
+ int m_spurious_wakeups;
+
+ /** The desired concurrency. This number of workers should be
+ actively executing. */
+ unsigned int m_concurrency;
+
+ /** True, if threadpool is being shutdown, false otherwise */
+ bool m_in_shutdown;
+
+ /** Maintenance timer state : true = active(ON),false = inactive(OFF)*/
+ enum class timer_state_t
+ {
+ OFF, ON
+ };
+ timer_state_t m_timer_state= timer_state_t::OFF;
+ void switch_timer(timer_state_t state);
+
+ /* Updates idle_since, and maybe switches the timer off */
+ void check_idle(std::chrono::system_clock::time_point now);
+
+ /** time point when timer last ran, used as a coarse clock. */
+ std::chrono::system_clock::time_point m_timestamp;
+
+ /** Number of long running tasks. The long running tasks are excluded when
+ adjusting concurrency */
+ unsigned int m_long_tasks_count;
+
+ unsigned int m_waiting_task_count;
+
+ /** Last time thread was created*/
+ std::chrono::system_clock::time_point m_last_thread_creation;
+
+ /** Minimumum number of threads in this pool.*/
+ unsigned int m_min_threads;
+
+ /** Maximimum number of threads in this pool. */
+ unsigned int m_max_threads;
+
+ /* maintenance related statistics (see maintenance()) */
+ size_t m_last_thread_count;
+ unsigned long long m_last_activity;
+
+ void worker_main(worker_data *thread_data);
+ void worker_end(worker_data* thread_data);
+
+ /* Checks threadpool responsiveness, adjusts thread_counts */
+ void maintenance();
+ static void maintenance_func(void* arg)
+ {
+ ((thread_pool_generic *)arg)->maintenance();
+ }
+ bool add_thread();
+ bool wake(worker_wake_reason reason, task *t = nullptr);
+ void maybe_wake_or_create_thread();
+ bool too_many_active_threads();
+ bool get_task(worker_data *thread_var, task **t);
+ bool wait_for_tasks(std::unique_lock<std::mutex> &lk,
+ worker_data *thread_var);
+ void cancel_pending(task* t);
+
+ size_t thread_count()
+ {
+ return m_active_threads.size() + m_standby_threads.size();
+ }
+public:
+ thread_pool_generic(int min_threads, int max_threads);
+ ~thread_pool_generic();
+ void wait_begin() override;
+ void wait_end() override;
+ void submit_task(task *task) override;
+ virtual aio *create_native_aio(int max_io) override
+ {
+#ifdef _WIN32
+ return create_win_aio(this, max_io);
+#elif defined(__linux__)
+ return create_linux_aio(this,max_io);
+#else
+ return nullptr;
+#endif
+ }
+
+ class timer_generic : public thr_timer_t, public timer
+ {
+ thread_pool_generic* m_pool;
+ waitable_task m_task;
+ callback_func m_callback;
+ void* m_data;
+ int m_period;
+ std::mutex m_mtx;
+ bool m_on;
+ std::atomic<bool> m_running;
+
+ void run()
+ {
+ /*
+ In rare cases, multiple callbacks can be scheduled,
+ e.g with set_time(0,0) in a loop.
+ We do not allow parallel execution, as user is not prepared.
+ */
+ bool expected = false;
+ if (!m_running.compare_exchange_strong(expected, true))
+ return;
+
+ m_callback(m_data);
+ dbug_execute_after_task_callback();
+ m_running = false;
+
+ if (m_pool && m_period)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_on)
+ {
+ DBUG_PUSH_EMPTY;
+ thr_timer_end(this);
+ thr_timer_settime(this, 1000ULL * m_period);
+ DBUG_POP_EMPTY;
+ }
+ }
+ }
+
+ static void execute(void* arg)
+ {
+ auto timer = (timer_generic*)arg;
+ timer->run();
+ }
+
+ static void submit_task(void* arg)
+ {
+ timer_generic* timer = (timer_generic*)arg;
+ timer->m_pool->submit_task(&timer->m_task);
+ }
+
+ public:
+ timer_generic(callback_func func, void* data, thread_pool_generic * pool):
+ m_pool(pool),
+ m_task(timer_generic::execute,this),
+ m_callback(func),m_data(data),m_period(0),m_mtx(),
+ m_on(true),m_running()
+ {
+ if (pool)
+ {
+ /* EXecute callback in threadpool*/
+ thr_timer_init(this, submit_task, this);
+ }
+ else
+ {
+ /* run in "timer" thread */
+ thr_timer_init(this, m_task.get_func(), m_task.get_arg());
+ }
+ }
+
+ void set_time(int initial_delay_ms, int period_ms) override
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!m_on)
+ return;
+ thr_timer_end(this);
+ if (!m_pool)
+ thr_timer_set_period(this, 1000ULL * period_ms);
+ else
+ m_period = period_ms;
+ thr_timer_settime(this, 1000ULL * initial_delay_ms);
+ }
+
+ /*
+ Change only period of a periodic timer
+ (after the next execution). Workarounds
+ mysys timer deadlocks
+ */
+ void set_period(int period_ms)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (!m_on)
+ return;
+ if (!m_pool)
+ thr_timer_set_period(this, 1000ULL * period_ms);
+ else
+ m_period = period_ms;
+ }
+
+ void disarm() override
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_on = false;
+ thr_timer_end(this);
+ lk.unlock();
+
+ if (m_task.m_group)
+ {
+ m_task.m_group->cancel_pending(&m_task);
+ }
+ if (m_pool)
+ {
+ m_pool->cancel_pending(&m_task);
+ }
+ m_task.wait();
+ }
+
+ virtual ~timer_generic()
+ {
+ disarm();
+ }
+ };
+ timer_generic m_maintenance_timer;
+ virtual timer* create_timer(callback_func func, void *data) override
+ {
+ return new timer_generic(func, data, this);
+ }
+};
+
+void thread_pool_generic::cancel_pending(task* t)
+{
+ std::unique_lock <std::mutex> lk(m_mtx);
+ for (auto it = m_task_queue.begin(); it != m_task_queue.end(); it++)
+ {
+ if (*it == t)
+ {
+ t->release();
+ *it = nullptr;
+ }
+ }
+}
+/**
+ Register worker in standby list, and wait to be woken.
+
+ @retval true if thread was woken
+ @retval false idle wait timeout exceeded (the current thread must shutdown)
+*/
+bool thread_pool_generic::wait_for_tasks(std::unique_lock<std::mutex> &lk,
+ worker_data *thread_data)
+{
+ assert(m_task_queue.empty());
+ assert(!m_in_shutdown);
+
+ thread_data->m_wake_reason= WAKE_REASON_NONE;
+ m_active_threads.erase(thread_data);
+ m_standby_threads.push_back(thread_data);
+
+ for (;;)
+ {
+ thread_data->m_cv.wait_for(lk, m_thread_timeout);
+
+ if (thread_data->m_wake_reason != WAKE_REASON_NONE)
+ {
+ /* Woke up not due to timeout.*/
+ return true;
+ }
+
+ if (thread_count() <= m_min_threads)
+ {
+ /* Do not shutdown thread, maintain required minimum of worker
+ threads.*/
+ continue;
+ }
+
+ /*
+ Woke up due to timeout, remove this thread's from the standby list. In
+ all other cases where it is signaled it is removed by the signaling
+ thread.
+ */
+ m_standby_threads.erase(thread_data);
+ m_active_threads.push_back(thread_data);
+ return false;
+ }
+}
+
+
+/**
+ Workers "get next task" routine.
+
+ A task can be handed over to the current thread directly during submit().
+ if thread_var->m_wake_reason == WAKE_REASON_TASK.
+
+ Or a task can be taken from the task queue.
+ In case task queue is empty, the worker thread will park (wait for wakeup).
+*/
+bool thread_pool_generic::get_task(worker_data *thread_var, task **t)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+
+ if (thread_var->is_long_task())
+ {
+ DBUG_ASSERT(m_long_tasks_count);
+ m_long_tasks_count--;
+ }
+ DBUG_ASSERT(!thread_var->is_waiting());
+ thread_var->m_state = worker_data::NONE;
+
+ while (m_task_queue.empty())
+ {
+ if (m_in_shutdown)
+ return false;
+
+ if (!wait_for_tasks(lk, thread_var))
+ return false;
+ if (m_task_queue.empty())
+ {
+ m_spurious_wakeups++;
+ continue;
+ }
+ }
+
+ /* Dequeue from the task queue.*/
+ *t= m_task_queue.front();
+ m_task_queue.pop();
+ m_tasks_dequeued++;
+ thread_var->m_state |= worker_data::EXECUTING_TASK;
+ thread_var->m_task_start_time = m_timestamp;
+ return true;
+}
+
+/** Worker thread shutdown routine. */
+void thread_pool_generic::worker_end(worker_data* thread_data)
+{
+ std::lock_guard<std::mutex> lk(m_mtx);
+ DBUG_ASSERT(!thread_data->is_long_task());
+ m_active_threads.erase(thread_data);
+ m_thread_data_cache.put(thread_data);
+
+ if (!thread_count() && m_in_shutdown)
+ {
+ /* Signal the destructor that no more threads are left. */
+ m_cv_no_threads.notify_all();
+ }
+}
+
+extern "C" void set_tls_pool(tpool::thread_pool* pool);
+
+/* The worker get/execute task loop.*/
+void thread_pool_generic::worker_main(worker_data *thread_var)
+{
+ task* task;
+ set_tls_pool(this);
+ if(m_worker_init_callback)
+ m_worker_init_callback();
+
+ tls_worker_data = thread_var;
+
+ while (get_task(thread_var, &task) && task)
+ {
+ task->execute();
+ }
+
+ if (m_worker_destroy_callback)
+ m_worker_destroy_callback();
+
+ worker_end(thread_var);
+}
+
+
+/*
+ Check if threadpool had been idle for a while
+ Switch off maintenance timer if it is in idle state
+ for too long.
+
+ Helper function, to be used inside maintenance callback,
+ before m_last_activity is updated
+*/
+
+static const auto invalid_timestamp= std::chrono::system_clock::time_point::max();
+constexpr auto max_idle_time= std::chrono::minutes(1);
+
+/* Time since maintenance timer had nothing to do */
+static std::chrono::system_clock::time_point idle_since= invalid_timestamp;
+void thread_pool_generic::check_idle(std::chrono::system_clock::time_point now)
+{
+ DBUG_ASSERT(m_task_queue.empty());
+
+ /*
+ We think that there is no activity, if there were at most 2 tasks
+ since last time, and there is a spare thread.
+ The 2 tasks (and not 0) is to account for some periodic timers.
+ */
+ bool idle= m_standby_threads.m_count > 0;
+
+ if (!idle)
+ {
+ idle_since= invalid_timestamp;
+ return;
+ }
+
+ if (idle_since == invalid_timestamp)
+ {
+ idle_since= now;
+ return;
+ }
+
+ /* Switch timer off after 1 minute of idle time */
+ if (now - idle_since > max_idle_time)
+ {
+ idle_since= invalid_timestamp;
+ switch_timer(timer_state_t::OFF);
+ }
+}
+
+
+/*
+ Periodic job to fix thread count and concurrency,
+ in case of long tasks, etc
+*/
+void thread_pool_generic::maintenance()
+{
+ /*
+ If pool is busy (i.e the its mutex is currently locked), we can
+ skip the maintenance task, some times, to lower mutex contention
+ */
+ static int skip_counter;
+ const int MAX_SKIPS = 10;
+ std::unique_lock<std::mutex> lk(m_mtx, std::defer_lock);
+ if (skip_counter == MAX_SKIPS)
+ {
+ lk.lock();
+ }
+ else if (!lk.try_lock())
+ {
+ skip_counter++;
+ return;
+ }
+
+ skip_counter = 0;
+
+ m_timestamp = std::chrono::system_clock::now();
+
+ if (m_task_queue.empty())
+ {
+ check_idle(m_timestamp);
+ m_last_activity = m_tasks_dequeued + m_wakeups;
+ return;
+ }
+
+ m_long_tasks_count = 0;
+ for (auto thread_data = m_active_threads.front();
+ thread_data;
+ thread_data = thread_data->m_next)
+ {
+ if (thread_data->is_executing_task() &&
+ !thread_data->is_waiting() &&
+ (thread_data->is_long_task()
+ || (m_timestamp - thread_data->m_task_start_time > LONG_TASK_DURATION)))
+ {
+ thread_data->m_state |= worker_data::LONG_TASK;
+ m_long_tasks_count++;
+ }
+ }
+
+ maybe_wake_or_create_thread();
+
+ size_t thread_cnt = (int)thread_count();
+ if (m_last_activity == m_tasks_dequeued + m_wakeups &&
+ m_last_thread_count <= thread_cnt && m_active_threads.size() == thread_cnt)
+ {
+ // no progress made since last iteration. create new
+ // thread
+ add_thread();
+ }
+ m_last_activity = m_tasks_dequeued + m_wakeups;
+ m_last_thread_count= thread_cnt;
+}
+
+/*
+ Heuristic used for thread creation throttling.
+ Returns interval in milliseconds between thread creation
+ (depending on number of threads already in the pool, and
+ desired concurrency level)
+*/
+static int throttling_interval_ms(size_t n_threads,size_t concurrency)
+{
+ if (n_threads < concurrency*4)
+ return 0;
+
+ if (n_threads < concurrency*8)
+ return 50;
+
+ if (n_threads < concurrency*16)
+ return 100;
+
+ return 200;
+}
+
+/* Create a new worker.*/
+bool thread_pool_generic::add_thread()
+{
+ size_t n_threads = thread_count();
+
+ if (n_threads >= m_max_threads)
+ return false;
+
+ if (n_threads >= m_min_threads)
+ {
+ auto now = std::chrono::system_clock::now();
+ if (now - m_last_thread_creation <
+ std::chrono::milliseconds(throttling_interval_ms(n_threads, m_concurrency)))
+ {
+ /*
+ Throttle thread creation and wakeup deadlock detection timer,
+ if is it off.
+ */
+ switch_timer(timer_state_t::ON);
+
+ return false;
+ }
+ }
+
+ worker_data *thread_data = m_thread_data_cache.get();
+ m_active_threads.push_back(thread_data);
+ try
+ {
+ std::thread thread(&thread_pool_generic::worker_main, this, thread_data);
+ m_last_thread_creation = std::chrono::system_clock::now();
+ thread.detach();
+ }
+ catch (std::system_error& e)
+ {
+ m_active_threads.erase(thread_data);
+ m_thread_data_cache.put(thread_data);
+ static bool warning_written;
+ if (!warning_written)
+ {
+ fprintf(stderr, "Warning : threadpool thread could not be created :%s,"
+ "current number of threads in pool %zu\n", e.what(), thread_count());
+ warning_written = true;
+ }
+ return false;
+ }
+ return true;
+}
+
+/** Wake a standby thread, and hand the given task over to this thread. */
+bool thread_pool_generic::wake(worker_wake_reason reason, task *)
+{
+ assert(reason != WAKE_REASON_NONE);
+
+ if (m_standby_threads.empty())
+ return false;
+ auto var= m_standby_threads.back();
+ m_standby_threads.pop_back();
+ m_active_threads.push_back(var);
+ assert(var->m_wake_reason == WAKE_REASON_NONE);
+ var->m_wake_reason= reason;
+ var->m_cv.notify_one();
+ m_wakeups++;
+ return true;
+}
+
+
+thread_pool_generic::thread_pool_generic(int min_threads, int max_threads) :
+ m_thread_data_cache(max_threads),
+ m_task_queue(10000),
+ m_standby_threads(),
+ m_active_threads(),
+ m_mtx(),
+ m_thread_timeout(std::chrono::milliseconds(60000)),
+ m_timer_interval(std::chrono::milliseconds(400)),
+ m_cv_no_threads(),
+ m_cv_timer(),
+ m_tasks_enqueued(),
+ m_tasks_dequeued(),
+ m_wakeups(),
+ m_spurious_wakeups(),
+ m_concurrency(std::thread::hardware_concurrency()*2),
+ m_in_shutdown(),
+ m_timestamp(),
+ m_long_tasks_count(),
+ m_waiting_task_count(),
+ m_last_thread_creation(),
+ m_min_threads(min_threads),
+ m_max_threads(max_threads),
+ m_last_thread_count(),
+ m_last_activity(),
+ m_maintenance_timer(thread_pool_generic::maintenance_func, this, nullptr)
+{
+
+ if (m_max_threads < m_concurrency)
+ m_concurrency = m_max_threads;
+ if (m_min_threads > m_concurrency)
+ m_concurrency = min_threads;
+ if (!m_concurrency)
+ m_concurrency = 1;
+
+ // start the timer
+ m_maintenance_timer.set_time(0, (int)m_timer_interval.count());
+}
+
+
+void thread_pool_generic::maybe_wake_or_create_thread()
+{
+ if (m_task_queue.empty())
+ return;
+ DBUG_ASSERT(m_active_threads.size() >= static_cast<size_t>(m_long_tasks_count + m_waiting_task_count));
+ if (m_active_threads.size() - m_long_tasks_count - m_waiting_task_count > m_concurrency)
+ return;
+ if (!m_standby_threads.empty())
+ {
+ wake(WAKE_REASON_TASK);
+ }
+ else
+ {
+ add_thread();
+ }
+}
+
+bool thread_pool_generic::too_many_active_threads()
+{
+ return m_active_threads.size() - m_long_tasks_count - m_waiting_task_count >
+ m_concurrency* OVERSUBSCRIBE_FACTOR;
+}
+
+/** Submit a new task*/
+void thread_pool_generic::submit_task(task* task)
+{
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (m_in_shutdown)
+ return;
+ task->add_ref();
+ m_tasks_enqueued++;
+ m_task_queue.push(task);
+ maybe_wake_or_create_thread();
+}
+
+
+/* Notify thread pool that current thread is going to wait */
+void thread_pool_generic::wait_begin()
+{
+ if (!tls_worker_data || tls_worker_data->is_long_task())
+ return;
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if(tls_worker_data->is_long_task())
+ {
+ /*
+ Current task flag could have become "long-running"
+ while waiting for the lock, thus recheck.
+ */
+ return;
+ }
+ DBUG_ASSERT(!tls_worker_data->is_waiting());
+ tls_worker_data->m_state |= worker_data::WAITING;
+ m_waiting_task_count++;
+
+ /* Maintain concurrency */
+ maybe_wake_or_create_thread();
+}
+
+
+void thread_pool_generic::wait_end()
+{
+ if (tls_worker_data && tls_worker_data->is_waiting())
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ tls_worker_data->m_state &= ~worker_data::WAITING;
+ m_waiting_task_count--;
+ }
+}
+
+
+void thread_pool_generic::switch_timer(timer_state_t state)
+{
+ if (m_timer_state == state)
+ return;
+ /*
+ We can't use timer::set_time, because mysys timers are deadlock
+ prone.
+
+ Instead, to switch off we increase the timer period
+ and decrease period to switch on.
+
+ This might introduce delays in thread creation when needed,
+ as period will only be changed when timer fires next time.
+ For this reason, we can't use very long periods for the "off" state.
+ */
+ m_timer_state= state;
+ long long period= (state == timer_state_t::OFF) ?
+ m_timer_interval.count()*10: m_timer_interval.count();
+
+ m_maintenance_timer.set_period((int)period);
+}
+
+
+/**
+ Wake up all workers, and wait until they are gone
+ Stop the timer.
+*/
+thread_pool_generic::~thread_pool_generic()
+{
+ /*
+ Stop AIO early.
+ This is needed to prevent AIO completion thread
+ from calling submit_task() on an object that is being destroyed.
+ */
+ m_aio.reset();
+
+ /* Also stop the maintanence task early. */
+ m_maintenance_timer.disarm();
+
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_in_shutdown= true;
+
+ /* Wake up idle threads. */
+ while (wake(WAKE_REASON_SHUTDOWN))
+ {
+ }
+
+ while (thread_count())
+ {
+ m_cv_no_threads.wait(lk);
+ }
+
+ lk.unlock();
+}
+
+thread_pool *create_thread_pool_generic(int min_threads, int max_threads)
+{
+ return new thread_pool_generic(min_threads, max_threads);
+}
+
+} // namespace tpool
diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h
new file mode 100644
index 00000000..7b0fb857
--- /dev/null
+++ b/tpool/tpool_structs.h
@@ -0,0 +1,357 @@
+/* Copyright(C) 2019 MariaDB Corporation
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#pragma once
+#include <vector>
+#include <stack>
+#include <mutex>
+#include <condition_variable>
+#include <assert.h>
+#include <algorithm>
+
+
+/* Suppress TSAN warnings, that we believe are not critical. */
+#if defined(__has_feature)
+#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__)
+#else
+#define TPOOL_HAS_FEATURE(...) 0
+#endif
+
+#if TPOOL_HAS_FEATURE(address_sanitizer)
+#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize("thread"),noinline))
+#elif defined(__GNUC__) && defined (__SANITIZE_THREAD__)
+#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize_thread,noinline))
+#else
+#define TPOOL_SUPPRESS_TSAN
+#endif
+
+namespace tpool
+{
+
+enum cache_notification_mode
+{
+ NOTIFY_ONE,
+ NOTIFY_ALL
+};
+
+/**
+ Generic "pointer" cache of a fixed size
+ with fast put/get operations.
+
+ Compared to STL containers, is faster/does not
+ do allocations. However, put() operation will wait
+ if there is no free items.
+*/
+template<typename T> class cache
+{
+ std::mutex m_mtx;
+ std::condition_variable m_cv;
+ std::vector<T> m_base;
+ std::vector<T*> m_cache;
+ cache_notification_mode m_notification_mode;
+ int m_waiters;
+
+ bool is_full()
+ {
+ return m_cache.size() == m_base.size();
+ }
+
+public:
+ cache(size_t count, cache_notification_mode mode= tpool::cache_notification_mode::NOTIFY_ALL):
+ m_mtx(), m_cv(), m_base(count),m_cache(count), m_notification_mode(mode),m_waiters()
+ {
+ for(size_t i = 0 ; i < count; i++)
+ m_cache[i]=&m_base[i];
+ }
+
+ T* get(bool blocking=true)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ if (blocking)
+ {
+ while(m_cache.empty())
+ m_cv.wait(lk);
+ }
+ else
+ {
+ if(m_cache.empty())
+ return nullptr;
+ }
+ T* ret = m_cache.back();
+ m_cache.pop_back();
+ return ret;
+ }
+
+
+ void put(T *ele)
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_cache.push_back(ele);
+ if (m_notification_mode == NOTIFY_ONE)
+ m_cv.notify_one();
+ else if(m_cache.size() == 1)
+ m_cv.notify_all(); // Signal cache is not empty
+ else if(m_waiters && is_full())
+ m_cv.notify_all(); // Signal cache is full
+ }
+
+ bool contains(T* ele)
+ {
+ return ele >= &m_base[0] && ele <= &m_base[m_base.size() -1];
+ }
+
+ /* Wait until cache is full.*/
+ void wait()
+ {
+ std::unique_lock<std::mutex> lk(m_mtx);
+ m_waiters++;
+ while(!is_full())
+ m_cv.wait(lk);
+ m_waiters--;
+ }
+
+ TPOOL_SUPPRESS_TSAN size_t size()
+ {
+ return m_cache.size();
+ }
+};
+
+
+/**
+ Circular, fixed size queue
+ used for the task queue.
+
+ Compared to STL queue, this one is
+ faster, and does not do memory allocations
+*/
+template <typename T> class circular_queue
+{
+
+public:
+ circular_queue(size_t N = 16)
+ : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail()
+ {
+ }
+ bool empty() { return m_head == m_tail; }
+ bool full() { return (m_head + 1) % m_capacity == m_tail; }
+ void clear() { m_head = m_tail = 0; }
+ void resize(size_t new_size)
+ {
+ auto current_size = size();
+ if (new_size <= current_size)
+ return;
+ size_t new_capacity = new_size - 1;
+ std::vector<T> new_buffer(new_capacity);
+ /* Figure out faster way to copy*/
+ size_t i = 0;
+ while (!empty())
+ {
+ T& ele = front();
+ pop();
+ new_buffer[i++] = ele;
+ }
+ m_buffer = new_buffer;
+ m_capacity = new_capacity;
+ m_tail = 0;
+ m_head = current_size;
+ }
+ void push(T ele)
+ {
+ if (full())
+ {
+ assert(size() == m_capacity - 1);
+ resize(size() + 1024);
+ }
+ m_buffer[m_head] = ele;
+ m_head = (m_head + 1) % m_capacity;
+ }
+ void push_front(T ele)
+ {
+ if (full())
+ {
+ resize(size() + 1024);
+ }
+ if (m_tail == 0)
+ m_tail = m_capacity - 1;
+ else
+ m_tail--;
+ m_buffer[m_tail] = ele;
+ }
+ T& front()
+ {
+ assert(!empty());
+ return m_buffer[m_tail];
+ }
+ void pop()
+ {
+ assert(!empty());
+ m_tail = (m_tail + 1) % m_capacity;
+ }
+ size_t size()
+ {
+ if (m_head < m_tail)
+ {
+ return m_capacity - m_tail + m_head;
+ }
+ else
+ {
+ return m_head - m_tail;
+ }
+ }
+
+ /*Iterator over elements in queue.*/
+ class iterator
+ {
+ size_t m_pos;
+ circular_queue<T>* m_queue;
+ public:
+ explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {}
+ iterator& operator++()
+ {
+ m_pos= (m_pos + 1) % m_queue->m_capacity;
+ return *this;
+ }
+ iterator operator++(int)
+ {
+ iterator retval= *this;
+ ++*this;
+ return retval;
+ }
+ bool operator==(iterator other) const { return m_pos == other.m_pos; }
+ bool operator!=(iterator other) const { return !(*this == other); }
+ T& operator*() const { return m_queue->m_buffer[m_pos]; }
+ };
+
+ iterator begin() { return iterator(m_tail, this); }
+ iterator end() { return iterator(m_head, this); }
+private:
+ size_t m_capacity;
+ std::vector<T> m_buffer;
+ size_t m_head;
+ size_t m_tail;
+};
+
+/* Doubly linked list. Intrusive,
+ requires element to have m_next and m_prev pointers.
+*/
+template<typename T> class doubly_linked_list
+{
+public:
+ T* m_first;
+ T* m_last;
+ size_t m_count;
+ doubly_linked_list():m_first(),m_last(),m_count()
+ {}
+ void check()
+ {
+ assert(!m_first || !m_first->m_prev);
+ assert(!m_last || !m_last->m_next);
+ assert((!m_first && !m_last && m_count == 0)
+ || (m_first != 0 && m_last != 0 && m_count > 0));
+ T* current = m_first;
+ for(size_t i=1; i< m_count;i++)
+ {
+ current = current->m_next;
+ }
+ assert(current == m_last);
+ current = m_last;
+ for (size_t i = 1; i < m_count; i++)
+ {
+ current = current->m_prev;
+ }
+ assert(current == m_first);
+ }
+ T* front()
+ {
+ return m_first;
+ }
+ size_t size()
+ {
+ return m_count;
+ }
+ void push_back(T* ele)
+ {
+ ele->m_prev = m_last;
+ if (m_last)
+ m_last->m_next = ele;
+
+ ele->m_next = 0;
+ m_last = ele;
+ if (!m_first)
+ m_first = m_last;
+
+ m_count++;
+ }
+ T* back()
+ {
+ return m_last;
+ }
+ bool empty()
+ {
+ return m_count == 0;
+ }
+ void pop_back()
+ {
+ m_last = m_last->m_prev;
+ if (m_last)
+ m_last->m_next = 0;
+ else
+ m_first = 0;
+ m_count--;
+ }
+ bool contains(T* ele)
+ {
+ if (!ele)
+ return false;
+ T* current = m_first;
+ while(current)
+ {
+ if(current == ele)
+ return true;
+ current = current->m_next;
+ }
+ return false;
+ }
+
+ void erase(T* ele)
+ {
+ assert(contains(ele));
+
+ if (ele == m_first)
+ {
+ m_first = ele->m_next;
+ if (m_first)
+ m_first->m_prev = 0;
+ else
+ m_last = 0;
+ }
+ else if (ele == m_last)
+ {
+ assert(ele->m_prev);
+ m_last = ele->m_prev;
+ m_last->m_next = 0;
+ }
+ else
+ {
+ assert(ele->m_next);
+ assert(ele->m_prev);
+ ele->m_next->m_prev = ele->m_prev;
+ ele->m_prev->m_next = ele->m_next;
+ }
+ m_count--;
+ }
+};
+
+}
diff --git a/tpool/tpool_win.cc b/tpool/tpool_win.cc
new file mode 100644
index 00000000..09fd49d9
--- /dev/null
+++ b/tpool/tpool_win.cc
@@ -0,0 +1,292 @@
+/* Copyright(C) 2019 MariaDB
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#include "tpool_structs.h"
+#include <stdlib.h>
+#include <tpool.h>
+#include <windows.h>
+#include <atomic>
+
+/**
+ Implementation of tpool/aio based on Windows native threadpool.
+*/
+
+namespace tpool
+{
+/**
+ Pool, based on Windows native(Vista+) threadpool.
+*/
+class thread_pool_win : public thread_pool
+{
+ /**
+ Handle per-thread init/term functions.
+ Since it is Windows that creates thread, and not us,
+ it is tricky. We employ thread local storage data
+ and check whether init function was called, inside every callback.
+ */
+ struct tls_data
+ {
+ thread_pool_win *m_pool;
+ ~tls_data()
+ {
+ /* Call thread termination function. */
+ if (!m_pool)
+ return;
+
+ if (m_pool->m_worker_destroy_callback)
+ m_pool->m_worker_destroy_callback();
+
+ m_pool->m_thread_count--;
+ }
+ /** This needs to be called before every IO or simple task callback.*/
+ void callback_prolog(thread_pool_win* pool)
+ {
+ assert(pool);
+ assert(!m_pool || (m_pool == pool));
+ if (m_pool)
+ {
+ // TLS data already initialized.
+ return;
+ }
+ m_pool = pool;
+ m_pool->m_thread_count++;
+ // Call the thread init function.
+ if (m_pool->m_worker_init_callback)
+ m_pool->m_worker_init_callback();
+ }
+ };
+
+ static thread_local struct tls_data tls_data;
+ /** Timer */
+ class native_timer : public timer
+ {
+ std::mutex m_mtx; // protects against parallel execution
+ std::mutex m_shutdown_mtx; // protects m_on
+ PTP_TIMER m_ptp_timer;
+ callback_func m_func;
+ void *m_data;
+ thread_pool_win& m_pool;
+ int m_period;
+ bool m_on;
+
+ static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE callback_instance, void *context,
+ PTP_TIMER callback_timer)
+ {
+ native_timer *timer= (native_timer *) context;
+ tls_data.callback_prolog(&timer->m_pool);
+ std::unique_lock<std::mutex> lk(timer->m_mtx, std::defer_lock);
+ if (!lk.try_lock())
+ {
+ /* Do not try to run timers in parallel */
+ return;
+ }
+ timer->m_func(timer->m_data);
+ dbug_execute_after_task_callback();
+ if (timer->m_period)
+ timer->set_time(timer->m_period, timer->m_period);
+ }
+
+ public:
+ native_timer(thread_pool_win& pool, callback_func func, void* data) :
+ m_mtx(), m_func(func), m_data(data), m_pool(pool), m_period(), m_on(true)
+ {
+ m_ptp_timer= CreateThreadpoolTimer(timer_callback, this, &pool.m_env);
+ }
+ void set_time(int initial_delay_ms, int period_ms) override
+ {
+ std::unique_lock<std::mutex> lk(m_shutdown_mtx);
+ if (!m_on)
+ return;
+ long long initial_delay = -10000LL * initial_delay_ms;
+ SetThreadpoolTimer(m_ptp_timer, NULL, 0, 0);
+ SetThreadpoolTimer(m_ptp_timer, (PFILETIME)&initial_delay, 0, 100);
+ m_period = period_ms;
+ }
+ void disarm() override
+ {
+ std::unique_lock<std::mutex> lk(m_shutdown_mtx);
+ m_on = false;
+ SetThreadpoolTimer(m_ptp_timer, NULL , 0, 0);
+ lk.unlock();
+ /* Don't do it in timer callback, that will hang*/
+ WaitForThreadpoolTimerCallbacks(m_ptp_timer, TRUE);
+ }
+
+ ~native_timer()
+ {
+ disarm();
+ CloseThreadpoolTimer(m_ptp_timer);
+ }
+ };
+ /** AIO handler */
+ class native_aio : public aio
+ {
+ thread_pool_win& m_pool;
+
+ public:
+ native_aio(thread_pool_win &pool, int max_io)
+ : m_pool(pool)
+ {
+ }
+
+ /**
+ Submit async IO.
+ */
+ virtual int submit_io(aiocb* cb) override
+ {
+ memset((OVERLAPPED *)cb, 0, sizeof(OVERLAPPED));
+
+ ULARGE_INTEGER uli;
+ uli.QuadPart = cb->m_offset;
+ cb->Offset = uli.LowPart;
+ cb->OffsetHigh = uli.HighPart;
+ cb->m_internal = this;
+ StartThreadpoolIo(cb->m_fh.m_ptp_io);
+
+ BOOL ok;
+ if (cb->m_opcode == aio_opcode::AIO_PREAD)
+ ok = ReadFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+ else
+ ok = WriteFile(cb->m_fh.m_handle, cb->m_buffer, cb->m_len, 0, cb);
+
+ if (ok || (GetLastError() == ERROR_IO_PENDING))
+ return 0;
+
+ CancelThreadpoolIo(cb->m_fh.m_ptp_io);
+ return -1;
+ }
+
+ /**
+ PTP_WIN32_IO_CALLBACK-typed function, required parameter for
+ CreateThreadpoolIo(). The user callback and other auxiliary data is put into
+ the extended OVERLAPPED parameter.
+ */
+ static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance,
+ PVOID context, PVOID overlapped,
+ ULONG io_result, ULONG_PTR nbytes,
+ PTP_IO io)
+ {
+ aiocb* cb = (aiocb*)overlapped;
+ native_aio* aio = (native_aio*)cb->m_internal;
+ tls_data.callback_prolog(&aio->m_pool);
+ cb->m_err = io_result;
+ cb->m_ret_len = (int)nbytes;
+ cb->m_internal_task.m_func = cb->m_callback;
+ cb->m_internal_task.m_group = cb->m_group;
+ cb->m_internal_task.m_arg = cb;
+ cb->m_internal_task.execute();
+ }
+
+ /**
+ Binds the file handle via CreateThreadpoolIo().
+ */
+ virtual int bind(native_file_handle& fd) override
+ {
+ fd.m_ptp_io =
+ CreateThreadpoolIo(fd.m_handle, io_completion_callback, 0, &(m_pool.m_env));
+ if (fd.m_ptp_io)
+ return 0;
+ return -1;
+ }
+
+ /**
+ Unbind the file handle via CloseThreadpoolIo.
+ */
+ virtual int unbind(const native_file_handle& fd) override
+ {
+ if (fd.m_ptp_io)
+ CloseThreadpoolIo(fd.m_ptp_io);
+ return 0;
+ }
+ };
+
+ PTP_POOL m_ptp_pool;
+ TP_CALLBACK_ENVIRON m_env;
+ PTP_CLEANUP_GROUP m_cleanup;
+ const int TASK_CACHE_SIZE= 10000;
+
+ struct task_cache_entry
+ {
+ thread_pool_win *m_pool;
+ task* m_task;
+ };
+ cache<task_cache_entry> m_task_cache;
+ std::atomic<int> m_thread_count;
+public:
+ thread_pool_win(int min_threads= 0, int max_threads= 0)
+ : m_task_cache(TASK_CACHE_SIZE),m_thread_count(0)
+ {
+ InitializeThreadpoolEnvironment(&m_env);
+ m_ptp_pool= CreateThreadpool(NULL);
+ m_cleanup= CreateThreadpoolCleanupGroup();
+ SetThreadpoolCallbackPool(&m_env, m_ptp_pool);
+ SetThreadpoolCallbackCleanupGroup(&m_env, m_cleanup, 0);
+ if (min_threads)
+ SetThreadpoolThreadMinimum(m_ptp_pool, min_threads);
+ if (max_threads)
+ SetThreadpoolThreadMaximum(m_ptp_pool, max_threads);
+ }
+ ~thread_pool_win()
+ {
+ CloseThreadpoolCleanupGroupMembers(m_cleanup, TRUE, NULL);
+ CloseThreadpoolCleanupGroup(m_cleanup);
+ CloseThreadpool(m_ptp_pool);
+
+ // Wait until all threads finished and TLS destructors ran.
+ while(m_thread_count)
+ Sleep(1);
+ }
+ /**
+ PTP_SIMPLE_CALLBACK-typed function, used by TrySubmitThreadpoolCallback()
+ */
+ static void CALLBACK task_callback(PTP_CALLBACK_INSTANCE, void *param)
+ {
+ auto entry= (task_cache_entry *) param;
+ auto task= entry->m_task;
+
+ tls_data.callback_prolog(entry->m_pool);
+
+ entry->m_pool->m_task_cache.put(entry);
+
+ task->execute();
+ }
+ virtual void submit_task(task *task) override
+ {
+ auto entry= m_task_cache.get();
+ task->add_ref();
+ entry->m_pool= this;
+ entry->m_task= task;
+ if (!TrySubmitThreadpoolCallback(task_callback, entry, &m_env))
+ abort();
+ }
+
+ aio *create_native_aio(int max_io) override
+ {
+ return new native_aio(*this, max_io);
+ }
+
+ timer* create_timer(callback_func func, void* data) override
+ {
+ return new native_timer(*this, func, data);
+ }
+};
+
+thread_local struct thread_pool_win::tls_data thread_pool_win::tls_data;
+
+thread_pool *create_thread_pool_win(int min_threads, int max_threads)
+{
+ return new thread_pool_win(min_threads, max_threads);
+}
+} // namespace tpool
diff --git a/tpool/wait_notification.cc b/tpool/wait_notification.cc
new file mode 100644
index 00000000..7743e2db
--- /dev/null
+++ b/tpool/wait_notification.cc
@@ -0,0 +1,25 @@
+#include <tpool.h>
+
+namespace tpool
+{
+static thread_local tpool::thread_pool* tls_thread_pool;
+
+extern "C" void set_tls_pool(tpool::thread_pool* pool)
+{
+ tls_thread_pool = pool;
+}
+
+extern "C" void tpool_wait_begin()
+{
+ if (tls_thread_pool)
+ tls_thread_pool->wait_begin();
+}
+
+
+extern "C" void tpool_wait_end()
+{
+ if (tls_thread_pool)
+ tls_thread_pool->wait_end();
+}
+
+}