diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:07:14 +0000 |
commit | a175314c3e5827eb193872241446f2f8f5c9d33c (patch) | |
tree | cd3d60ca99ae00829c52a6ca79150a5b6e62528b /sql/threadpool_win.cc | |
parent | Initial commit. (diff) | |
download | mariadb-10.5-upstream.tar.xz mariadb-10.5-upstream.zip |
Adding upstream version 1:10.5.12.upstream/1%10.5.12upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | sql/threadpool_win.cc | 482 |
1 files changed, 482 insertions, 0 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc new file mode 100644 index 00000000..6003b06b --- /dev/null +++ b/sql/threadpool_win.cc @@ -0,0 +1,482 @@ +/* Copyright (C) 2012 Monty Program Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif + +#define _WIN32_WINNT 0x0601 + +#include "mariadb.h" +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <threadpool.h> +#include <windows.h> + +/* Log a warning */ +static void tp_log_warning(const char *msg, const char *fct) +{ + sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct, + GetLastError()); +} + + +static PTP_POOL pool; +static TP_CALLBACK_ENVIRON callback_environ; +static DWORD fls; + +static bool skip_completion_port_on_success = false; + +PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ() +{ + return pool? &callback_environ: 0; +} + +/* + Threadpool callbacks. + + io_completion_callback - handle client request + timer_callback - handle wait timeout (kill connection) + login_callback - user login (submitted as threadpool work) + +*/ + +static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_TIMER timer); + +static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work); + +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance); + +/* Get current time as Windows time */ +static ulonglong now() +{ + ulonglong current_time; + GetSystemTimeAsFileTime((PFILETIME)¤t_time); + return current_time; +} + +struct TP_connection_win:public TP_connection +{ +public: + TP_connection_win(CONNECT*); + ~TP_connection_win(); + virtual int init(); + virtual int start_io(); + virtual void set_io_timeout(int sec); + virtual void wait_begin(int type); + virtual void wait_end(); + + ulonglong timeout; + enum_vio_type vio_type; + HANDLE handle; + OVERLAPPED overlapped; + PTP_CALLBACK_INSTANCE callback_instance; + PTP_IO io; + PTP_TIMER timer; + PTP_WORK work; + bool long_callback; + +}; + +struct TP_connection *new_TP_connection(CONNECT *connect) +{ + TP_connection *c = new (std::nothrow) TP_connection_win(connect); + if (!c || c->init()) + { + delete c; + return 0; + } + return c; +} + +void TP_pool_win::add(TP_connection *c) +{ + if(FlsGetValue(fls)) + { + /* Inside threadpool(), execute callback directly. */ + tp_callback(c); + } + else + { + SubmitThreadpoolWork(((TP_connection_win *)c)->work); + } +} + + +TP_connection_win::TP_connection_win(CONNECT *c) : + TP_connection(c), + timeout(ULONGLONG_MAX), + callback_instance(0), + io(0), + timer(0), + work(0) +{ +} + +#define CHECK_ALLOC_ERROR(op) if (!(op)) {tp_log_warning("Allocation failed", #op); DBUG_ASSERT(0); return -1; } + +int TP_connection_win::init() +{ + + memset(&overlapped, 0, sizeof(OVERLAPPED)); + switch ((vio_type = connect->vio_type)) + { + case VIO_TYPE_SSL: + case VIO_TYPE_TCPIP: + handle= (HANDLE) mysql_socket_getfd(connect->sock); + break; + case VIO_TYPE_NAMEDPIPE: + handle= connect->pipe; + break; + default: + abort(); + } + + + /* Performance tweaks (s. MSDN documentation)*/ + UCHAR flags= FILE_SKIP_SET_EVENT_ON_HANDLE; + if (skip_completion_port_on_success) + { + flags |= FILE_SKIP_COMPLETION_PORT_ON_SUCCESS; + } + (void)SetFileCompletionNotificationModes(handle, flags); + /* Assign io completion callback */ + CHECK_ALLOC_ERROR(io= CreateThreadpoolIo(handle, io_completion_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); + return 0; +} + + +/* + Start asynchronous read +*/ +int TP_connection_win::start_io() +{ + DWORD num_bytes = 0; + static char c; + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + DWORD last_error= 0; + + int retval; + StartThreadpoolIo(io); + + if (vio_type == VIO_TYPE_TCPIP || vio_type == VIO_TYPE_SSL) + { + /* Start async io (sockets). */ + if (WSARecv((SOCKET)handle , &buf, 1, &num_bytes, &flags, + &overlapped, NULL) == 0) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= WSAGetLastError(); + } + } + else + { + /* Start async io (named pipe) */ + if (ReadFile(handle, &c, 0, &num_bytes,&overlapped)) + { + retval= last_error= 0; + } + else + { + retval= -1; + last_error= GetLastError(); + } + } + + if (retval == 0 || last_error == ERROR_MORE_DATA) + { + /* + IO successfully finished (synchronously). + If skip_completion_port_on_success is set, we need to handle it right + here, because completion callback would not be executed by the pool. + */ + if (skip_completion_port_on_success) + { + CancelThreadpoolIo(io); + io_completion_callback(callback_instance, this, &overlapped, last_error, + num_bytes, io); + } + return 0; + } + + if (last_error == ERROR_IO_PENDING) + { + return 0; + } + + /* Some error occurred */ + CancelThreadpoolIo(io); + return -1; +} + +/* + Recalculate wait timeout, maybe reset timer. +*/ +void TP_connection_win::set_io_timeout(int timeout_sec) +{ + ulonglong old_timeout= timeout; + ulonglong new_timeout = now() + 10000000LL * timeout_sec; + + if (new_timeout < old_timeout) + { + SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000); + } + /* new_timeout > old_timeout case is handled by expiring timer. */ + timeout = new_timeout; +} + + +TP_connection_win::~TP_connection_win() +{ + if (io) + CloseThreadpoolIo(io); + + if (work) + CloseThreadpoolWork(work); + + if (timer) + { + SetThreadpoolTimer(timer, 0, 0, 0); + WaitForThreadpoolTimerCallbacks(timer, TRUE); + CloseThreadpoolTimer(timer); + } +} + +void TP_connection_win::wait_begin(int type) +{ + /* + Signal to the threadpool whenever callback can run long. Currently, binlog + waits are a good candidate, its waits are really long + */ + if (type == THD_WAIT_BINLOG) + { + if (!long_callback && callback_instance) + { + CallbackMayRunLong(callback_instance); + long_callback= true; + } + } +} + +void TP_connection_win::wait_end() +{ + /* Do we need to do anything ? */ +} + +/* + This function should be called first whenever a callback is invoked in the + threadpool, does my_thread_init() if not yet done +*/ +void tp_win_callback_prolog() +{ + if (FlsGetValue(fls) == NULL) + { + /* Running in new worker thread*/ + FlsSetValue(fls, (void *)1); + statistic_increment(thread_created, &LOCK_status); + tp_stats.num_worker_threads++; + my_thread_init(); + } +} + +extern ulong thread_created; +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) +{ + tp_win_callback_prolog(); + TP_connection_win *c = (TP_connection_win *)context; + c->callback_instance = instance; + c->long_callback = false; +} + + +/* + Decrement number of threads when a thread exits. + On Windows, FlsAlloc() provides the thread destruction callbacks. +*/ +static VOID WINAPI thread_destructor(void *data) +{ + if(data) + { + tp_stats.num_worker_threads--; + my_thread_end(); + } +} + + + +static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context) +{ + pre_callback(context, instance); + tp_callback((TP_connection *)context); +} + + +/* + Handle read completion/notification. +*/ +static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) +{ + TP_connection_win *c= (TP_connection_win *)context; + /* + Execute high priority connections immediately. + 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback) + which makes Windows threadpool place the items at the end of its internal work queue. + */ + if (c->priority == TP_PRIORITY_HIGH) + tp_callback(instance, context); + else + SubmitThreadpoolWork(c->work); +} + + +/* + Timer callback. + Invoked when connection times out (wait_timeout) +*/ +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID parameter, PTP_TIMER timer) +{ + TP_connection_win *c = (TP_connection_win *)parameter; + if (c->timeout <= now()) + { + tp_timeout_handler(c); + } + else + { + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. + + Given the relative unimportance of the wait timeout, we accept race + condition. + */ + SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000); + } +} + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work) +{ + tp_callback(instance, context); +} + +TP_pool_win::TP_pool_win() +{} + +int TP_pool_win::init() +{ + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + + if (!pool) + { + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + return -1; + } + + InitializeThreadpoolEnvironment(&callback_environ); + SetThreadpoolCallbackPool(&callback_environ, pool); + + if (threadpool_max_threads) + { + SetThreadpoolThreadMaximum(pool, threadpool_max_threads); + } + + if (threadpool_min_threads) + { + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning("Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } + } + + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) + { + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); + } + return 0; +} + + +/** + Scheduler callback : Destroy the scheduler. +*/ +TP_pool_win::~TP_pool_win() +{ + if (!pool) + return; + DestroyThreadpoolEnvironment(&callback_environ); + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + if (!tp_stats.num_worker_threads) + FlsFree(fls); +} +/** + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. +*/ +int TP_pool_win::set_min_threads(uint val) +{ + SetThreadpoolThreadMinimum(pool, val); + return 0; +} + +int TP_pool_win::set_max_threads(uint val) +{ + SetThreadpoolThreadMaximum(pool, val); + return 0; +} + + +TP_connection *TP_pool_win::new_connection(CONNECT *connect) +{ + TP_connection *c= new (std::nothrow) TP_connection_win(connect); + if (!c ) + return 0; + if (c->init()) + { + delete c; + return 0; + } + return c; +} |