summaryrefslogtreecommitdiffstats
path: root/sql/threadpool_common.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-13 12:24:36 +0000
commit06eaf7232e9a920468c0f8d74dcf2fe8b555501c (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /sql/threadpool_common.cc
parentInitial commit. (diff)
downloadmariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.tar.xz
mariadb-06eaf7232e9a920468c0f8d74dcf2fe8b555501c.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'sql/threadpool_common.cc')
-rw-r--r--sql/threadpool_common.cc636
1 files changed, 636 insertions, 0 deletions
diff --git a/sql/threadpool_common.cc b/sql/threadpool_common.cc
new file mode 100644
index 00000000..22730550
--- /dev/null
+++ b/sql/threadpool_common.cc
@@ -0,0 +1,636 @@
+/* Copyright (C) 2012, 2020, MariaDB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+
+#include "mariadb.h"
+#include <violite.h>
+#include <sql_priv.h>
+#include <sql_class.h>
+#include <my_pthread.h>
+#include <scheduler.h>
+#include <sql_connect.h>
+#include <sql_audit.h>
+#include <debug_sync.h>
+#include <threadpool.h>
+#include <sql_class.h>
+#include <sql_parse.h>
+
+#ifdef WITH_WSREP
+#include "wsrep_trans_observer.h"
+#endif /* WITH_WSREP */
+
+#ifdef _WIN32
+#include "threadpool_winsockets.h"
+#endif
+
+/* Threadpool parameters */
+
+uint threadpool_min_threads;
+uint threadpool_idle_timeout;
+uint threadpool_size;
+uint threadpool_max_size;
+uint threadpool_stall_limit;
+uint threadpool_max_threads;
+uint threadpool_oversubscribe;
+uint threadpool_mode;
+uint threadpool_prio_kickup_timer;
+my_bool threadpool_exact_stats;
+my_bool threadpool_dedicated_listener;
+
+/* Stats */
+TP_STATISTICS tp_stats;
+
+
+static void threadpool_remove_connection(THD *thd);
+static dispatch_command_return threadpool_process_request(THD *thd);
+static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);
+
+extern bool do_command(THD*);
+
+static inline TP_connection *get_TP_connection(THD *thd)
+{
+ return (TP_connection *)thd->event_scheduler.data;
+}
+
+/*
+ Worker threads contexts, and THD contexts.
+ =========================================
+
+ Both worker threads and connections have their sets of thread local variables
+ At the moment it is mysys_var (this has specific data for dbug, my_error and
+ similar goodies), and PSI per-client structure.
+
+ Whenever query is executed following needs to be done:
+
+ 1. Save worker thread context.
+ 2. Change TLS variables to connection specific ones using thread_attach(THD*).
+ This function does some additional work , e.g setting up
+ thread_stack/thread_ends_here pointers.
+ 3. Process query
+ 4. Restore worker thread context.
+
+ Connection login and termination follows similar schema w.r.t saving and
+ restoring contexts.
+
+ For both worker thread, and for the connection, mysys variables are created
+ using my_thread_init() and freed with my_thread_end().
+
+*/
+struct Worker_thread_context
+{
+ PSI_thread *psi_thread;
+ st_my_thread_var* mysys_var;
+
+ Worker_thread_context()
+ {
+ psi_thread= PSI_CALL_get_thread();
+ mysys_var= my_thread_var;
+ }
+
+ ~Worker_thread_context()
+ {
+ PSI_CALL_set_thread(psi_thread);
+ set_mysys_var(mysys_var);
+ set_current_thd(nullptr);
+ }
+};
+
+
+#ifdef HAVE_PSI_INTERFACE
+
+/*
+ The following fixes PSI "idle" psi instrumentation.
+ The server assumes that connection becomes idle
+ just before net_read_packet() and switches to active after it.
+ In out setup, server becomes idle when async socket io is made.
+*/
+
+extern void net_before_header_psi(struct st_net *net, void *user_data, size_t);
+
+static void dummy_before_header(struct st_net *, void *, size_t)
+{
+}
+
+static void re_init_net_server_extension(THD *thd)
+{
+ thd->m_net_server_extension.m_before_header = dummy_before_header;
+}
+
+#else
+
+#define re_init_net_server_extension(thd)
+
+#endif /* HAVE_PSI_INTERFACE */
+
+static inline bool has_unread_compressed_data(const NET *net)
+{
+ return net->compress && net->remain_in_buf;
+}
+
+static inline void set_thd_idle(THD *thd)
+{
+ thd->net.reading_or_writing= 1;
+#ifdef HAVE_PSI_INTERFACE
+ if (!has_unread_compressed_data(&thd->net))
+ net_before_header_psi(&thd->net, thd, 0);
+#endif
+}
+
+/*
+ Per OS thread info (ID and pthread_self)
+ stored as TLS, because of syscall overhead
+ (on Linux)
+*/
+struct OS_thread_info
+{
+ pthread_t self;
+ ssize_t stack_size;
+ uint32_t thread_id;
+
+ inline bool initialized() { return stack_size != 0; }
+
+ void init(ssize_t ssize)
+ {
+#if _WIN32
+ self= thread_id= GetCurrentThreadId();
+#else
+#ifdef __NR_gettid
+ thread_id= (uint32) syscall(__NR_gettid);
+#else
+ thread_id= 0;
+#endif
+ self= pthread_self();
+#endif
+ stack_size= ssize;
+ }
+};
+static thread_local OS_thread_info os_thread_info;
+
+static const OS_thread_info *get_os_thread_info()
+{
+ auto *res= &os_thread_info;
+ if (!res->initialized())
+ res->init((ssize_t) (my_thread_stack_size * STACK_DIRECTION));
+ return res;
+}
+
+/*
+ Attach/associate the connection with the OS thread,
+*/
+static void thread_attach(THD* thd)
+{
+#ifdef WITH_WSREP
+ /* Wait until possible background rollback has finished before
+ attaching the thd. */
+ wsrep_wait_rollback_complete_and_acquire_ownership(thd);
+#endif /* WITH_WSREP */
+ set_mysys_var(thd->mysys_var);
+ thd->thread_stack=(char*)&thd;
+ set_current_thd(thd);
+ auto tinfo= get_os_thread_info();
+ thd->real_id= tinfo->self;
+ thd->os_thread_id= tinfo->thread_id;
+ DBUG_ASSERT(thd->mysys_var == my_thread_var);
+ thd->mysys_var->stack_ends_here= thd->thread_stack + tinfo->stack_size;
+ PSI_CALL_set_thread(thd->get_psi());
+}
+
+/*
+ Determine connection priority , using current
+ transaction state and 'threadpool_priority' variable value.
+*/
+static TP_PRIORITY get_priority(TP_connection *c)
+{
+ DBUG_ASSERT(c->thd == current_thd);
+ TP_PRIORITY prio= (TP_PRIORITY)c->thd->variables.threadpool_priority;
+ if (prio == TP_PRIORITY_AUTO)
+ prio= c->thd->transaction->is_active() ? TP_PRIORITY_HIGH : TP_PRIORITY_LOW;
+
+ return prio;
+}
+
+
+void tp_callback(TP_connection *c)
+{
+ DBUG_ASSERT(c);
+
+ Worker_thread_context worker_context;
+
+ THD *thd= c->thd;
+
+ c->state = TP_STATE_RUNNING;
+
+ if (unlikely(!thd))
+ {
+ /* No THD, need to login first. */
+ DBUG_ASSERT(c->connect);
+ thd= c->thd= threadpool_add_connection(c->connect, c);
+ if (!thd)
+ {
+ /* Bail out on connect error.*/
+ goto error;
+ }
+ c->connect= 0;
+ }
+ else
+ {
+retry:
+ switch(threadpool_process_request(thd))
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ if (!thd->async_state.try_suspend())
+ {
+ /*
+ All async operations finished meanwhile, thus nobody is will wake up
+ this THD. Therefore, we'll resume "manually" here.
+ */
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ goto retry;
+ }
+ return;
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ /* QUIT or an error occurred. */
+ goto error;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+ thd->async_state.m_state= thd_async_state::enum_async_state::NONE;
+ }
+
+ /* Set priority */
+ c->priority= get_priority(c);
+
+ /* Read next command from client. */
+ c->set_io_timeout(thd->get_net_wait_timeout());
+ c->state= TP_STATE_IDLE;
+ if (c->start_io())
+ goto error;
+ return;
+
+error:
+ c->thd= 0;
+ if (thd)
+ {
+ threadpool_remove_connection(thd);
+ }
+ delete c;
+}
+
+
+static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
+{
+ THD *thd= NULL;
+
+ /*
+ Create a new connection context: mysys_thread_var and PSI thread
+ Store them in THD.
+ */
+
+ set_mysys_var(NULL);
+ my_thread_init();
+ st_my_thread_var* mysys_var= my_thread_var;
+ PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, connect, 0));
+ if (!mysys_var ||!(thd= connect->create_thd(NULL)))
+ {
+ /* Out of memory? */
+ connect->close_and_delete();
+ if (mysys_var)
+ my_thread_end();
+ return NULL;
+ }
+
+ thd->event_scheduler.data= c;
+ server_threads.insert(thd); // Make THD visible in show processlist
+ delete connect; // must be after server_threads.insert, see close_connections()
+ thd->set_mysys_var(mysys_var);
+
+ /* Login. */
+ thread_attach(thd);
+ mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
+ re_init_net_server_extension(thd);
+ ulonglong now= microsecond_interval_timer();
+ thd->prior_thr_create_utime= now;
+ thd->start_utime= now;
+ thd->thr_create_utime= now;
+
+ setup_connection_thread_globals(thd);
+
+ if (thd_prepare_connection(thd))
+ goto end;
+
+ c->init_vio(thd->net.vio);
+
+ /*
+ Check if THD is ok, as prepare_new_connection_state()
+ can fail, for example if init command failed.
+ */
+ if (!thd_is_connection_alive(thd))
+ goto end;
+
+ thd->skip_wait_timeout= true;
+ set_thd_idle(thd);
+ return thd;
+
+end:
+ threadpool_remove_connection(thd);
+ return NULL;
+}
+
+
+static void threadpool_remove_connection(THD *thd)
+{
+ thread_attach(thd);
+ thd->net.reading_or_writing = 0;
+ end_connection(thd);
+ close_connection(thd, 0);
+ unlink_thd(thd);
+ PSI_CALL_delete_current_thread(); // before THD is destroyed
+ delete thd;
+
+ /*
+ Free resources associated with this connection:
+ mysys thread_var and PSI thread.
+ */
+ my_thread_end();
+}
+
+
+/*
+ Ensure that proper error message is sent to client,
+ and "aborted" message appears in the log in case of
+ wait timeout.
+
+ See also timeout handling in net_serv.cc
+*/
+static void handle_wait_timeout(THD *thd)
+{
+ thd->get_stmt_da()->reset_diagnostics_area();
+ thd->reset_killed();
+ my_error(ER_NET_READ_INTERRUPTED, MYF(0));
+ thd->net.last_errno= ER_NET_READ_INTERRUPTED;
+ thd->net.error= 2;
+}
+
+/** Check if some client data is cached in thd->net or thd->net.vio */
+static bool has_unread_data(THD* thd)
+{
+ NET *net= &thd->net;
+ Vio *vio= net->vio;
+ return vio->has_data(vio) || has_unread_compressed_data(net);
+}
+
+
+/**
+ Process a single client request or a single batch.
+*/
+static dispatch_command_return threadpool_process_request(THD *thd)
+{
+ dispatch_command_return retval= DISPATCH_COMMAND_SUCCESS;
+
+ thread_attach(thd);
+ if(thd->async_state.m_state == thd_async_state::enum_async_state::RESUMED)
+ goto resume;
+
+ if (thd->killed >= KILL_CONNECTION)
+ {
+ /*
+ killed flag was set by timeout handler
+ or KILL command. Return error.
+ */
+ retval= DISPATCH_COMMAND_CLOSE_CONNECTION;
+ if(thd->killed == KILL_WAIT_TIMEOUT)
+ handle_wait_timeout(thd);
+ goto end;
+ }
+
+
+ /*
+ In the loop below, the flow is essentially the copy of
+ thead-per-connections
+ logic, see do_handle_one_connection() in sql_connect.c
+
+ The goal is to execute a single query, thus the loop is normally executed
+ only once. However for SSL connections, it can be executed multiple times
+ (SSL can preread and cache incoming data, and vio->has_data() checks if it
+ was the case).
+ */
+ for(;;)
+ {
+ thd->net.reading_or_writing= 0;
+ if (mysql_audit_release_required(thd))
+ mysql_audit_release(thd);
+
+resume:
+ retval= do_command(thd, false);
+ switch(retval)
+ {
+ case DISPATCH_COMMAND_WOULDBLOCK:
+ case DISPATCH_COMMAND_CLOSE_CONNECTION:
+ goto end;
+ case DISPATCH_COMMAND_SUCCESS:
+ break;
+ }
+
+ if (!thd_is_connection_alive(thd))
+ {
+ retval=DISPATCH_COMMAND_CLOSE_CONNECTION;
+ goto end;
+ }
+
+ set_thd_idle(thd);
+
+ if (!has_unread_data(thd))
+ {
+ /* More info on this debug sync is in sql_parse.cc*/
+ DEBUG_SYNC(thd, "before_do_command_net_read");
+ goto end;
+ }
+ }
+
+end:
+ return retval;
+}
+
+
+static TP_pool *pool;
+
+static bool tp_init()
+{
+
+#ifdef _WIN32
+ if (threadpool_mode == TP_MODE_WINDOWS)
+ pool= new (std::nothrow) TP_pool_win;
+ else
+ pool= new (std::nothrow) TP_pool_generic;
+#else
+ pool= new (std::nothrow) TP_pool_generic;
+#endif
+ if (!pool)
+ return true;
+ if (pool->init())
+ {
+ delete pool;
+ pool= 0;
+ return true;
+ }
+#ifdef _WIN32
+ init_win_aio_buffers(max_connections);
+#endif
+ return false;
+}
+
+static void tp_add_connection(CONNECT *connect)
+{
+ TP_connection *c= pool->new_connection(connect);
+ DBUG_EXECUTE_IF("simulate_failed_connection_1", delete c ; c= 0;);
+ if (c)
+ pool->add(c);
+ else
+ connect->close_and_delete();
+}
+
+int tp_get_idle_thread_count()
+{
+ return pool? pool->get_idle_thread_count(): 0;
+}
+
+int tp_get_thread_count()
+{
+ return pool ? pool->get_thread_count() : 0;
+}
+
+void tp_set_min_threads(uint val)
+{
+ if (pool)
+ pool->set_min_threads(val);
+}
+
+
+void tp_set_max_threads(uint val)
+{
+ if (pool)
+ pool->set_max_threads(val);
+}
+
+void tp_set_threadpool_size(uint val)
+{
+ if (pool)
+ pool->set_pool_size(val);
+}
+
+
+void tp_set_threadpool_stall_limit(uint val)
+{
+ if (pool)
+ pool->set_stall_limit(val);
+}
+
+
+void tp_timeout_handler(TP_connection *c)
+{
+ if (c->state != TP_STATE_IDLE)
+ return;
+ THD *thd= c->thd;
+ mysql_mutex_lock(&thd->LOCK_thd_kill);
+ Vio *vio= thd->net.vio;
+ if (vio && (vio_pending(vio) > 0 || vio->has_data(vio)) &&
+ c->state == TP_STATE_IDLE)
+ {
+ /*
+ There is some data on that connection, i.e
+ i.e there was no inactivity timeout.
+ Don't kill.
+ */
+ c->state= TP_STATE_PENDING;
+ }
+ else if (c->state == TP_STATE_IDLE)
+ {
+ thd->set_killed_no_mutex(KILL_WAIT_TIMEOUT);
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+ }
+ mysql_mutex_unlock(&thd->LOCK_thd_kill);
+}
+
+MY_ALIGNED(CPU_LEVEL1_DCACHE_LINESIZE) Atomic_counter<unsigned long long> tp_waits[THD_WAIT_LAST];
+
+static void tp_wait_begin(THD *thd, int type)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ {
+ DBUG_ASSERT(type > 0 && type < THD_WAIT_LAST);
+ tp_waits[type]++;
+ c->wait_begin(type);
+ }
+}
+
+
+static void tp_wait_end(THD *thd)
+{
+ TP_connection *c = get_TP_connection(thd);
+ if (c)
+ c->wait_end();
+}
+
+
+static void tp_end()
+{
+ delete pool;
+#ifdef _WIN32
+ destroy_win_aio_buffers();
+#endif
+}
+
+static void tp_post_kill_notification(THD *thd)
+{
+ TP_connection *c= get_TP_connection(thd);
+ if (c)
+ c->priority= TP_PRIORITY_HIGH;
+ post_kill_notification(thd);
+}
+
+/* Resume previously suspended THD */
+static void tp_resume(THD* thd)
+{
+ DBUG_ASSERT(thd->async_state.m_state == thd_async_state::enum_async_state::SUSPENDED);
+ thd->async_state.m_state = thd_async_state::enum_async_state::RESUMED;
+ TP_connection* c = get_TP_connection(thd);
+ pool->resume(c);
+}
+
+static scheduler_functions tp_scheduler_functions=
+{
+ 0, // max_threads
+ NULL,
+ NULL,
+ tp_init, // init
+ tp_add_connection, // add_connection
+ tp_wait_begin, // thd_wait_begin
+ tp_wait_end, // thd_wait_end
+ tp_post_kill_notification, // post kill notification
+ tp_end, // end
+ tp_resume
+};
+
+void pool_of_threads_scheduler(struct scheduler_functions *func,
+ ulong *arg_max_connections,
+ Atomic_counter<uint> *arg_connection_count)
+{
+ *func = tp_scheduler_functions;
+ func->max_threads= threadpool_max_threads;
+ func->max_connections= arg_max_connections;
+ func->connection_count= arg_connection_count;
+ scheduler_init();
+}