diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-13 12:24:36 +0000 |
commit | 06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /sql/threadpool_win.cc | |
parent | Initial commit. (diff) | |
download | mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/threadpool_win.cc')
-rw-r--r-- | sql/threadpool_win.cc | 447 |
1 files changed, 447 insertions, 0 deletions
diff --git a/sql/threadpool_win.cc b/sql/threadpool_win.cc new file mode 100644 index 00000000..ed68e31c --- /dev/null +++ b/sql/threadpool_win.cc @@ -0,0 +1,447 @@ +/* Copyright (C) 2012 Monty Program Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#ifdef _WIN32_WINNT +#undef _WIN32_WINNT +#endif + +#define _WIN32_WINNT 0x0601 + +#include "mariadb.h" +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <threadpool.h> +#include <windows.h> +#include <set_var.h> + +#include "threadpool_winsockets.h" + +/* Log a warning */ +static void tp_log_warning(const char *msg, const char *fct) +{ + sql_print_warning("Threadpool: %s. %s failed (last error %d)",msg, fct, + GetLastError()); +} + + +static PTP_POOL pool; +static TP_CALLBACK_ENVIRON callback_environ; +static DWORD fls; + +PTP_CALLBACK_ENVIRON get_threadpool_win_callback_environ() +{ + return pool? &callback_environ: 0; +} + +/* + Threadpool callbacks. + + io_completion_callback - handle client request + timer_callback - handle wait timeout (kill connection) + login_callback - user login (submitted as threadpool work) + +*/ + +static void CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PTP_TIMER timer); + +static void CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io); + + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work); + +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance); + +/* Get current time as Windows time */ +static ulonglong now() +{ + ulonglong current_time; + GetSystemTimeAsFileTime((PFILETIME)¤t_time); + return current_time; +} + +struct TP_connection_win:public TP_connection +{ +public: + TP_connection_win(CONNECT*); + ~TP_connection_win(); + int init() override; + void init_vio(st_vio *vio) override; + int start_io() override; + void set_io_timeout(int sec) override; + void wait_begin(int type) override; + void wait_end() override; + + ulonglong timeout=ULLONG_MAX; + OVERLAPPED overlapped{}; + PTP_CALLBACK_INSTANCE callback_instance{}; + PTP_IO io{}; + PTP_TIMER timer{}; + PTP_WORK work{}; + bool long_callback{}; + win_aiosocket sock; +}; + +struct TP_connection *new_TP_connection(CONNECT *connect) +{ + TP_connection *c = new (std::nothrow) TP_connection_win(connect); + if (!c || c->init()) + { + delete c; + return 0; + } + return c; +} + +void TP_pool_win::add(TP_connection *c) +{ + if(FlsGetValue(fls)) + { + /* Inside threadpool(), execute callback directly. */ + tp_callback(c); + } + else + { + SubmitThreadpoolWork(((TP_connection_win *)c)->work); + } +} + +void TP_pool_win::resume(TP_connection* c) +{ + DBUG_ASSERT(c->state == TP_STATE_RUNNING); + SubmitThreadpoolWork(((TP_connection_win*)c)->work); +} + +#define CHECK_ALLOC_ERROR(op) \ + do \ + { \ + if (!(op)) \ + { \ + tp_log_warning("Allocation failed", #op); \ + } \ + } while (0) + +TP_connection_win::TP_connection_win(CONNECT *c) : + TP_connection(c) +{ + /* Assign io completion callback */ + HANDLE h= c->vio_type == VIO_TYPE_NAMEDPIPE ? c->pipe + : (HANDLE)mysql_socket_getfd(c->sock); + + CHECK_ALLOC_ERROR(io=CreateThreadpoolIo(h, io_completion_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(timer= CreateThreadpoolTimer(timer_callback, this, &callback_environ)); + CHECK_ALLOC_ERROR(work= CreateThreadpoolWork(work_callback, this, &callback_environ)); +} + +int TP_connection_win::init() +{ + return !io || !timer || !work ; +} + +void TP_connection_win::init_vio(st_vio* vio) +{ + sock.init(vio); +} + +/* + Start asynchronous read +*/ +int TP_connection_win::start_io() +{ + StartThreadpoolIo(io); + if (sock.begin_read()) + { + /* Some error occurred */ + CancelThreadpoolIo(io); + return -1; + } + return 0; +} + +/* + Recalculate wait timeout, maybe reset timer. +*/ +void TP_connection_win::set_io_timeout(int timeout_sec) +{ + ulonglong old_timeout= timeout; + ulonglong new_timeout = now() + 10000000LL * timeout_sec; + + if (new_timeout < old_timeout) + { + SetThreadpoolTimer(timer, (PFILETIME)&new_timeout, 0, 1000); + } + /* new_timeout > old_timeout case is handled by expiring timer. */ + timeout = new_timeout; +} + + +TP_connection_win::~TP_connection_win() +{ + if (io) + CloseThreadpoolIo(io); + + if (work) + CloseThreadpoolWork(work); + + if (timer) + { + SetThreadpoolTimer(timer, 0, 0, 0); + WaitForThreadpoolTimerCallbacks(timer, TRUE); + CloseThreadpoolTimer(timer); + } +} + +void TP_connection_win::wait_begin(int type) +{ + /* + Signal to the threadpool whenever callback can run long. Currently, binlog + waits are a good candidate, its waits are really long + */ + if (type == THD_WAIT_BINLOG) + { + if (!long_callback && callback_instance) + { + CallbackMayRunLong(callback_instance); + long_callback= true; + } + } +} + +void TP_connection_win::wait_end() +{ + /* Do we need to do anything ? */ +} + +/* + This function should be called first whenever a callback is invoked in the + threadpool, does my_thread_init() if not yet done +*/ +void tp_win_callback_prolog() +{ + if (FlsGetValue(fls) == NULL) + { + /* Running in new worker thread*/ + FlsSetValue(fls, (void *)1); + thread_created++; + tp_stats.num_worker_threads++; + my_thread_init(); + } +} + +extern ulong thread_created; +static void pre_callback(PVOID context, PTP_CALLBACK_INSTANCE instance) +{ + tp_win_callback_prolog(); + TP_connection_win *c = (TP_connection_win *)context; + c->callback_instance = instance; + c->long_callback = false; +} + + +/* + Decrement number of threads when a thread exits. + On Windows, FlsAlloc() provides the thread destruction callbacks. +*/ +static VOID WINAPI thread_destructor(void *data) +{ + if(data) + { + tp_stats.num_worker_threads--; + my_thread_end(); + } +} + + + +static inline void tp_callback(PTP_CALLBACK_INSTANCE instance, PVOID context) +{ + pre_callback(context, instance); + tp_callback((TP_connection *)context); +} + + +/* + Handle read completion/notification. +*/ +static VOID CALLBACK io_completion_callback(PTP_CALLBACK_INSTANCE instance, + PVOID context, PVOID overlapped, ULONG io_result, ULONG_PTR nbytes, PTP_IO io) +{ + TP_connection_win *c= (TP_connection_win *)context; + + /* How many bytes were preread into read buffer */ + c->sock.end_read((ULONG)nbytes, io_result); + + /* + Execute high priority connections immediately. + 'Yield' in case of low priority connections, i.e SubmitThreadpoolWork (with the same callback) + which makes Windows threadpool place the items at the end of its internal work queue. + */ + if (c->priority == TP_PRIORITY_HIGH) + tp_callback(instance, context); + else + SubmitThreadpoolWork(c->work); +} + + +/* + Timer callback. + Invoked when connection times out (wait_timeout) +*/ +static VOID CALLBACK timer_callback(PTP_CALLBACK_INSTANCE instance, + PVOID parameter, PTP_TIMER timer) +{ + TP_connection_win *c = (TP_connection_win *)parameter; + if (c->timeout <= now()) + { + tp_timeout_handler(c); + } + else + { + /* + Reset timer. + There is a tiny possibility of a race condition, since the value of timeout + could have changed to smaller value in the thread doing io callback. + + Given the relative unimportance of the wait timeout, we accept race + condition. + */ + SetThreadpoolTimer(timer, (PFILETIME)&c->timeout, 0, 1000); + } +} + +static void CALLBACK work_callback(PTP_CALLBACK_INSTANCE instance, PVOID context, PTP_WORK work) +{ + tp_callback(instance, context); +} + +TP_pool_win::TP_pool_win() +{} + +int TP_pool_win::init() +{ + fls= FlsAlloc(thread_destructor); + pool= CreateThreadpool(NULL); + + if (!pool) + { + sql_print_error("Can't create threadpool. " + "CreateThreadpool() failed with %d. Likely cause is memory pressure", + GetLastError()); + return -1; + } + + InitializeThreadpoolEnvironment(&callback_environ); + SetThreadpoolCallbackPool(&callback_environ, pool); + + if (IS_SYSVAR_AUTOSIZE(&threadpool_max_threads)) + { + /* + Nr 500 comes from Microsoft documentation, + there is no API for GetThreadpoolThreadMaxThreads() + */ + SYSVAR_AUTOSIZE(threadpool_max_threads,500); + } + else + { + SetThreadpoolThreadMaximum(pool, threadpool_max_threads); + } + + if (IS_SYSVAR_AUTOSIZE(&threadpool_min_threads)) + { + SYSVAR_AUTOSIZE(threadpool_min_threads,1); + } + else + { + if (!SetThreadpoolThreadMinimum(pool, threadpool_min_threads)) + { + tp_log_warning("Can't set threadpool minimum threads", + "SetThreadpoolThreadMinimum"); + } + } + + + if (IS_SYSVAR_AUTOSIZE(&global_system_variables.threadpool_priority)) + { + /* + There is a notable overhead for "auto" priority implementation, + use "high" which handles socket IO callbacks as they come + without rescheduling to work queue. + */ + SYSVAR_AUTOSIZE(global_system_variables.threadpool_priority, + TP_PRIORITY_HIGH); + } + + TP_POOL_STACK_INFORMATION stackinfo; + stackinfo.StackCommit = 0; + stackinfo.StackReserve = (SIZE_T)my_thread_stack_size; + if (!SetThreadpoolStackInformation(pool, &stackinfo)) + { + tp_log_warning("Can't set threadpool stack size", + "SetThreadpoolStackInformation"); + } + return 0; +} + + +/** + Scheduler callback : Destroy the scheduler. +*/ +TP_pool_win::~TP_pool_win() +{ + if (!pool) + return; + DestroyThreadpoolEnvironment(&callback_environ); + SetThreadpoolThreadMaximum(pool, 0); + CloseThreadpool(pool); + if (!tp_stats.num_worker_threads) + FlsFree(fls); +} +/** + Sets the number of idle threads the thread pool maintains in anticipation of new + requests. +*/ +int TP_pool_win::set_min_threads(uint val) +{ + SetThreadpoolThreadMinimum(pool, val); + return 0; +} + +int TP_pool_win::set_max_threads(uint val) +{ + SetThreadpoolThreadMaximum(pool, val); + return 0; +} + + +TP_connection *TP_pool_win::new_connection(CONNECT *connect) +{ + TP_connection *c= new (std::nothrow) TP_connection_win(connect); + if (!c ) + return 0; + if (c->init()) + { + delete c; + return 0; + } + return c; +} + |