diff options
Diffstat (limited to '')
-rw-r--r-- | sql/threadpool_generic.cc | 1771 |
1 files changed, 1771 insertions, 0 deletions
diff --git a/sql/threadpool_generic.cc b/sql/threadpool_generic.cc new file mode 100644 index 00000000..b34104db --- /dev/null +++ b/sql/threadpool_generic.cc @@ -0,0 +1,1771 @@ +/* Copyright (C) 2012, 2020, MariaDB Corporation. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#if (defined HAVE_POOL_OF_THREADS) && !defined(EMBEDDED_LIBRARY) + +#include "threadpool_generic.h" +#include "mariadb.h" +#include <violite.h> +#include <sql_priv.h> +#include <sql_class.h> +#include <my_pthread.h> +#include <scheduler.h> +#include <sql_connect.h> +#include <mysqld.h> +#include <debug_sync.h> +#include <time.h> +#include <sql_plist.h> +#include <threadpool.h> +#include <algorithm> + +#ifdef HAVE_IOCP +#define OPTIONAL_IO_POLL_READ_PARAM this +#else +#define OPTIONAL_IO_POLL_READ_PARAM 0 +#endif + +static void io_poll_close(TP_file_handle fd) +{ +#ifdef _WIN32 + CloseHandle(fd); +#else + close(fd); +#endif +} + +/** Maximum number of native events a listener can read in one go */ +#define MAX_EVENTS 1024 + +/** Indicates that threadpool was initialized*/ +static bool threadpool_started= false; + +/* + Define PSI Keys for performance schema. + We have a mutex per group, worker threads, condition per worker thread, + and timer thread with its own mutex and condition. +*/ + + +#ifdef HAVE_PSI_INTERFACE +static PSI_mutex_key key_group_mutex; +static PSI_mutex_key key_timer_mutex; +static PSI_mutex_info mutex_list[]= +{ + { &key_group_mutex, "group_mutex", 0}, + { &key_timer_mutex, "timer_mutex", PSI_FLAG_GLOBAL} +}; + +static PSI_cond_key key_worker_cond; +static PSI_cond_key key_timer_cond; +static PSI_cond_info cond_list[]= +{ + { &key_worker_cond, "worker_cond", 0}, + { &key_timer_cond, "timer_cond", PSI_FLAG_GLOBAL} +}; + +static PSI_thread_key key_worker_thread; +static PSI_thread_key key_timer_thread; +static PSI_thread_info thread_list[] = +{ + {&key_worker_thread, "worker_thread", 0}, + {&key_timer_thread, "timer_thread", PSI_FLAG_GLOBAL} +}; + +/* Macro to simplify performance schema registration */ +#define PSI_register(X) \ + if(PSI_server) PSI_server->register_ ## X("threadpool", X ## _list, array_elements(X ## _list)) +#else +#define PSI_register(X) /* no-op */ +#endif + +thread_group_t *all_groups; +static uint group_count; +static Atomic_counter<uint32_t> shutdown_group_count; + +/** + Used for printing "pool blocked" message, see + print_pool_blocked_message(); +*/ +static ulonglong pool_block_start; + +/* Global timer for all groups */ +struct pool_timer_t +{ + mysql_mutex_t mutex; + mysql_cond_t cond; + volatile uint64 current_microtime; + std::atomic<uint64_t> next_timeout_check; + int tick_interval; + bool shutdown; + pthread_t timer_thread_id; +}; + +static pool_timer_t pool_timer; + +static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection); +static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt); +static int wake_thread(thread_group_t *thread_group,bool due_to_stall); +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall=false); +static int create_worker(thread_group_t *thread_group, bool due_to_stall); +static void *worker_main(void *param); +static void check_stall(thread_group_t *thread_group); +static void set_next_timeout_check(ulonglong abstime); +static void print_pool_blocked_message(bool); + +/** + Asynchronous network IO. + + We use native edge-triggered network IO multiplexing facility. + This maps to different APIs on different Unixes. + + Supported are currently Linux with epoll, Solaris with event ports, + OSX and BSD with kevent, Windows with IOCP. All those API's are used with one-shot flags + (the event is signalled once client has written something into the socket, + then socket is removed from the "poll-set" until the command is finished, + and we need to re-arm/re-register socket) + + No implementation for poll/select is currently provided. + + The API closely resembles all of the above mentioned platform APIs + and consists of following functions. + + - io_poll_create() + Creates an io_poll descriptor + On Linux: epoll_create() + + - io_poll_associate_fd(int poll_fd, TP_file_handle fd, void *data, void *opt) + Associate file descriptor with io poll descriptor + On Linux : epoll_ctl(..EPOLL_CTL_ADD)) + + - io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) + Associate file descriptor with io poll descriptor + On Linux: epoll_ctl(..EPOLL_CTL_DEL) + + + - io_poll_start_read(int poll_fd,int fd, void *data, void *opt) + The same as io_poll_associate_fd(), but cannot be used before + io_poll_associate_fd() was called. + On Linux : epoll_ctl(..EPOLL_CTL_MOD) + + - io_poll_wait (TP_file_handle pollfd, native_event *native_events, int maxevents, + int timeout_ms) + + wait until one or more descriptors added with io_poll_associate_fd() + or io_poll_start_read() becomes readable. Data associated with + descriptors can be retrieved from native_events array, using + native_event_get_userdata() function. + + On Linux: epoll_wait() +*/ + +#if defined (__linux__) +#ifndef EPOLLRDHUP +/* Early 2.6 kernel did not have EPOLLRDHUP */ +#define EPOLLRDHUP 0 +#endif +static TP_file_handle io_poll_create() +{ + return epoll_create(1); +} + + +int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void*) +{ + struct epoll_event ev; + ev.data.u64= 0; /* Keep valgrind happy */ + ev.data.ptr= data; + ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_ADD, fd, &ev); +} + + + +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) +{ + struct epoll_event ev; + ev.data.u64= 0; /* Keep valgrind happy */ + ev.data.ptr= data; + ev.events= EPOLLIN|EPOLLET|EPOLLERR|EPOLLRDHUP|EPOLLONESHOT; + return epoll_ctl(pollfd, EPOLL_CTL_MOD, fd, &ev); +} + +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) +{ + struct epoll_event ev; + return epoll_ctl(pollfd, EPOLL_CTL_DEL, fd, &ev); +} + + +/* + Wrapper around epoll_wait. + NOTE - in case of EINTR, it restarts with original timeout. Since we use + either infinite or 0 timeouts, this is not critical +*/ +int io_poll_wait(TP_file_handle pollfd, native_event *native_events, int maxevents, + int timeout_ms) +{ + int ret; + do + { + ret = epoll_wait(pollfd, native_events, maxevents, timeout_ms); + } + while(ret == -1 && errno == EINTR); + return ret; +} + + +static void *native_event_get_userdata(native_event *event) +{ + return event->data.ptr; +} + +#elif defined(HAVE_KQUEUE) + +/* + NetBSD prior to 9.99.17 is incompatible with other BSDs, last parameter + in EV_SET macro (udata, user data) needs to be intptr_t, whereas it needs + to be void* everywhere else. +*/ + +#ifdef __NetBSD__ +#include <sys/param.h> +# if !__NetBSD_Prereq__(9,99,17) +#define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, (intptr_t)g) +# endif +#endif + +#ifndef MY_EV_SET +#define MY_EV_SET(a, b, c, d, e, f, g) EV_SET(a, b, c, d, e, f, g) +#endif + + +TP_file_handle io_poll_create() +{ + return kqueue(); +} + +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) +{ + struct kevent ke; + MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, + 0, 0, data); + return kevent(pollfd, &ke, 1, 0, 0, 0); +} + + +int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data,void *) +{ + struct kevent ke; + MY_EV_SET(&ke, fd, EVFILT_READ, EV_ADD|EV_ONESHOT, + 0, 0, data); + return io_poll_start_read(pollfd,fd, data, 0); +} + + +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) +{ + struct kevent ke; + MY_EV_SET(&ke,fd, EVFILT_READ, EV_DELETE, 0, 0, 0); + return kevent(pollfd, &ke, 1, 0, 0, 0); +} + + +int io_poll_wait(TP_file_handle pollfd, struct kevent *events, int maxevents, int timeout_ms) +{ + struct timespec ts; + int ret; + if (timeout_ms >= 0) + { + ts.tv_sec= timeout_ms/1000; + ts.tv_nsec= (timeout_ms%1000)*1000000; + } + do + { + ret= kevent(pollfd, 0, 0, events, maxevents, + (timeout_ms >= 0)?&ts:NULL); + } + while (ret == -1 && errno == EINTR); + return ret; +} + +static void* native_event_get_userdata(native_event *event) +{ + return (void *)event->udata; +} + +#elif defined (__sun) + +static TP_file_handle io_poll_create() +{ + return port_create(); +} + +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) +{ + return port_associate(pollfd, PORT_SOURCE_FD, fd, POLLIN, data); +} + +static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *) +{ + return io_poll_start_read(pollfd, fd, data, 0); +} + +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) +{ + return port_dissociate(pollfd, PORT_SOURCE_FD, fd); +} + +int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) +{ + struct timespec ts; + int ret; + uint_t nget= 1; + if (timeout_ms >= 0) + { + ts.tv_sec= timeout_ms/1000; + ts.tv_nsec= (timeout_ms%1000)*1000000; + } + do + { + ret= port_getn(pollfd, events, maxevents, &nget, + (timeout_ms >= 0)?&ts:NULL); + } + while (ret == -1 && errno == EINTR); + DBUG_ASSERT(nget < INT_MAX); + return (int)nget; +} + +static void* native_event_get_userdata(native_event *event) +{ + return event->portev_user; +} + +#elif defined(HAVE_IOCP) + + +static TP_file_handle io_poll_create() +{ + return CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); +} + + +int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt) +{ + static char c; + TP_connection_generic *con= (TP_connection_generic *)opt; + OVERLAPPED *overlapped= &con->overlapped; + if (con->vio_type == VIO_TYPE_NAMEDPIPE) + { + if (ReadFile(fd, &c, 0, NULL, overlapped)) + return 0; + } + else + { + WSABUF buf; + buf.buf= &c; + buf.len= 0; + DWORD flags=0; + + if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0) + return 0; + } + + if (GetLastError() == ERROR_IO_PENDING) + return 0; + + return 1; +} + + +static int io_poll_associate_fd(TP_file_handle pollfd, TP_file_handle fd, void *data, void *opt) +{ + HANDLE h= CreateIoCompletionPort(fd, pollfd, (ULONG_PTR)data, 0); + if (!h) + return -1; + return io_poll_start_read(pollfd,fd, 0, opt); +} + + +typedef LONG NTSTATUS; + +typedef struct _IO_STATUS_BLOCK { + union { + NTSTATUS Status; + PVOID Pointer; + }; + ULONG_PTR Information; +} IO_STATUS_BLOCK, * PIO_STATUS_BLOCK; + +struct FILE_COMPLETION_INFORMATION { + HANDLE Port; + PVOID Key; +}; + +enum FILE_INFORMATION_CLASS { + FileReplaceCompletionInformation = 0x3D +}; + + +typedef NTSTATUS(WINAPI* pNtSetInformationFile)(HANDLE, PIO_STATUS_BLOCK, PVOID, ULONG, FILE_INFORMATION_CLASS); + +int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd) +{ + static pNtSetInformationFile my_NtSetInformationFile = (pNtSetInformationFile) + GetProcAddress(GetModuleHandle("ntdll"), "NtSetInformationFile"); + if (!my_NtSetInformationFile) + return -1; /* unexpected, we only support Windows 8.1+*/ + IO_STATUS_BLOCK iosb{}; + FILE_COMPLETION_INFORMATION fci{}; + if (my_NtSetInformationFile(fd,&iosb,&fci,sizeof(fci),FileReplaceCompletionInformation)) + return -1; + return 0; +} + + +int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms) +{ + ULONG n; + BOOL ok = GetQueuedCompletionStatusEx(pollfd, events, + maxevents, &n, timeout_ms, FALSE); + + return ok ? (int)n : -1; +} + + +static void* native_event_get_userdata(native_event *event) +{ + return (void *)event->lpCompletionKey; +} +#endif + + +/* Dequeue element from a workqueue */ + +static TP_connection_generic *queue_get(thread_group_t *thread_group) +{ + DBUG_ENTER("queue_get"); + thread_group->queue_event_count++; + TP_connection_generic *c; + for (int i=0; i < NQUEUES;i++) + { + c= thread_group->queues[i].pop_front(); + if (c) + DBUG_RETURN(c); + } + DBUG_RETURN(0); +} + +static TP_connection_generic* queue_get(thread_group_t* group, operation_origin origin) +{ + auto ret = queue_get(group); + if (ret) + { + TP_INCREMENT_GROUP_COUNTER(group, dequeues[(int)origin]); + } + return ret; +} + +static bool is_queue_empty(thread_group_t *thread_group) +{ + for (int i=0; i < NQUEUES; i++) + { + if (!thread_group->queues[i].is_empty()) + return false; + } + return true; +} + + +static void queue_init(thread_group_t *thread_group) +{ + for (int i=0; i < NQUEUES; i++) + { + thread_group->queues[i].empty(); + } +} + +static void queue_put(thread_group_t *thread_group, native_event *ev, int cnt) +{ + ulonglong now= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime; + for(int i=0; i < cnt; i++) + { + TP_connection_generic *c = (TP_connection_generic *)native_event_get_userdata(&ev[i]); + c->enqueue_time= now; + thread_group->queues[c->priority].push_back(c); + } +} + +/* + Handle wait timeout : + Find connections that have been idle for too long and kill them. + Also, recalculate time when next timeout check should run. +*/ + +static my_bool timeout_check(THD *thd, pool_timer_t *timer) +{ + DBUG_ENTER("timeout_check"); + if (thd->net.reading_or_writing == 1) + { + TP_connection_generic *connection= (TP_connection_generic *)thd->event_scheduler.data; + if (!connection || connection->state != TP_STATE_IDLE) + { + /* + Connection does not have scheduler data. This happens for example + if THD belongs to a different scheduler, that is listening to extra_port. + */ + DBUG_RETURN(0); + } + + if(connection->abs_wait_timeout < timer->current_microtime) + { + tp_timeout_handler(connection); + } + else + { + if (connection->abs_wait_timeout < timer->current_microtime) + tp_timeout_handler(connection); + else + set_next_timeout_check(connection->abs_wait_timeout); + } + } + DBUG_RETURN(0); +} + + +/* + Timer thread. + + Periodically, check if one of the thread groups is stalled. Stalls happen if + events are not being dequeued from the queue, or from the network, Primary + reason for stall can be a lengthy executing non-blocking request. It could + also happen that thread is waiting but wait_begin/wait_end is forgotten by + storage engine. Timer thread will create a new thread in group in case of + a stall. + + Besides checking for stalls, timer thread is also responsible for terminating + clients that have been idle for longer than wait_timeout seconds. + + TODO: Let the timer sleep for long time if there is no work to be done. + Currently it wakes up rather often on and idle server. +*/ + +static void* timer_thread(void *param) +{ + uint i; + pool_timer_t* timer=(pool_timer_t *)param; + + my_thread_init(); + DBUG_ENTER("timer_thread"); + timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(), + std::memory_order_relaxed); + timer->current_microtime= microsecond_interval_timer(); + + for(;;) + { + struct timespec ts; + int err; + + set_timespec_nsec(ts,timer->tick_interval*1000000); + mysql_mutex_lock(&timer->mutex); + err= mysql_cond_timedwait(&timer->cond, &timer->mutex, &ts); + if (timer->shutdown) + { + mysql_mutex_unlock(&timer->mutex); + break; + } + if (err == ETIMEDOUT) + { + timer->current_microtime= microsecond_interval_timer(); + + /* Check stalls in thread groups */ + for (i= 0; i < threadpool_max_size; i++) + { + if(all_groups[i].connection_count) + check_stall(&all_groups[i]); + } + + /* Check if any client exceeded wait_timeout */ + if (timer->next_timeout_check.load(std::memory_order_relaxed) <= + timer->current_microtime) + { + /* Reset next timeout check, it will be recalculated below */ + timer->next_timeout_check.store(std::numeric_limits<uint64_t>::max(), + std::memory_order_relaxed); + server_threads.iterate(timeout_check, timer); + } + } + mysql_mutex_unlock(&timer->mutex); + } + + mysql_mutex_destroy(&timer->mutex); + my_thread_end(); + return NULL; +} + + + +void check_stall(thread_group_t *thread_group) +{ + mysql_mutex_lock(&thread_group->mutex); + + /* + Bump priority for the low priority connections that spent too much + time in low prio queue. + */ + TP_connection_generic *c; + for (;;) + { + c= thread_group->queues[TP_PRIORITY_LOW].front(); + if (c && pool_timer.current_microtime - c->enqueue_time > 1000ULL * threadpool_prio_kickup_timer) + { + thread_group->queues[TP_PRIORITY_LOW].remove(c); + thread_group->queues[TP_PRIORITY_HIGH].push_back(c); + } + else + break; + } + + /* + Check if listener is present. If not, check whether any IO + events were dequeued since last time. If not, this means + listener is either in tight loop or thd_wait_begin() + was forgotten. Create a new worker(it will make itself listener). + */ + if (!thread_group->listener && !thread_group->io_event_count) + { + wake_or_create_thread(thread_group, true); + mysql_mutex_unlock(&thread_group->mutex); + return; + } + + /* Reset io event count */ + thread_group->io_event_count= 0; + + /* + Check whether requests from the workqueue are being dequeued. + + The stall detection and resolution works as follows: + + 1. There is a counter thread_group->queue_event_count for the number of + events removed from the queue. Timer resets the counter to 0 on each run. + 2. Timer determines stall if this counter remains 0 since last check + and the queue is not empty. + 3. Once timer determined a stall it sets thread_group->stalled flag and + wakes and idle worker (or creates a new one, subject to throttling). + 4. The stalled flag is reset, when an event is dequeued. + + Q : Will this handling lead to an unbound growth of threads, if queue + stalls permanently? + A : No. If queue stalls permanently, it is an indication for many very long + simultaneous queries. The maximum number of simultanoues queries is + max_connections, further we have threadpool_max_threads limit, upon which no + worker threads are created. So in case there is a flood of very long + queries, threadpool would slowly approach thread-per-connection behavior. + NOTE: + If long queries never wait, creation of the new threads is done by timer, + so it is slower than in real thread-per-connection. However if long queries + do wait and indicate that via thd_wait_begin/end callbacks, thread creation + will be faster. + */ + if (!is_queue_empty(thread_group) && !thread_group->queue_event_count) + { + thread_group->stalled= true; + TP_INCREMENT_GROUP_COUNTER(thread_group,stalls); + wake_or_create_thread(thread_group,true); + } + + /* Reset queue event count */ + thread_group->queue_event_count= 0; + + mysql_mutex_unlock(&thread_group->mutex); +} + + +static void start_timer(pool_timer_t* timer) +{ + DBUG_ENTER("start_timer"); + mysql_mutex_init(key_timer_mutex,&timer->mutex, NULL); + mysql_cond_init(key_timer_cond, &timer->cond, NULL); + timer->shutdown = false; + mysql_thread_create(key_timer_thread, &timer->timer_thread_id, NULL, + timer_thread, timer); + DBUG_VOID_RETURN; +} + + +static void stop_timer(pool_timer_t *timer) +{ + DBUG_ENTER("stop_timer"); + mysql_mutex_lock(&timer->mutex); + timer->shutdown = true; + mysql_cond_signal(&timer->cond); + mysql_mutex_unlock(&timer->mutex); + pthread_join(timer->timer_thread_id, NULL); + DBUG_VOID_RETURN; +} + + +/** + Poll for socket events and distribute them to worker threads + In many case current thread will handle single event itself. + + @return a ready connection, or NULL on shutdown +*/ +static TP_connection_generic * listener(worker_thread_t *current_thread, + thread_group_t *thread_group) +{ + DBUG_ENTER("listener"); + TP_connection_generic *retval= NULL; + + for(;;) + { + native_event ev[MAX_EVENTS]; + int cnt; + + if (thread_group->shutdown) + break; + + cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, -1); + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::LISTENER]); + if (cnt <=0) + { + DBUG_ASSERT(thread_group->shutdown); + break; + } + + mysql_mutex_lock(&thread_group->mutex); + + if (thread_group->shutdown) + { + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + thread_group->io_event_count += cnt; + + /* + We got some network events and need to make decisions : whether + listener hould handle events and whether or not any wake worker + threads so they can handle events. + + Q1 : Should listener handle an event itself, or put all events into + queue and let workers handle the events? + + Solution : + Generally, listener that handles events itself is preferable. We do not + want listener thread to change its state from waiting to running too + often, Since listener has just woken from poll, it better uses its time + slice and does some work. Besides, not handling events means they go to + the queue, and often to wake another worker must wake up to handle the + event. This is not good, as we want to avoid wakeups. + + The downside of listener that also handles queries is that we can + potentially leave thread group for long time not picking the new + network events. It is not a major problem, because this stall will be + detected sooner or later by the timer thread. Still, relying on timer + is not always good, because it may "tick" too slow (large timer_interval) + + We use following strategy to solve this problem - if queue was not empty + we suspect flood of network events and listener stays, Otherwise, it + handles a query. + + Q2: If queue is not empty, how many workers to wake? + + Solution: + We generally try to keep one thread per group active (threads handling + queries are considered active, unless they stuck in inside some "wait") + Thus, we will wake only one worker, and only if there is not active + threads currently,and listener is not going to handle a query. When we + don't wake, we hope that currently active threads will finish fast and + handle the queue. If this does not happen, timer thread will detect stall + and wake a worker. + + NOTE: Currently nothing is done to detect or prevent long queuing times. + A solution for the future would be to give up "one active thread per + group" principle, if events stay in the queue for too long, and just wake + more workers. + */ + + bool listener_picks_event=is_queue_empty(thread_group) && !threadpool_dedicated_listener; + queue_put(thread_group, ev, cnt); + if (listener_picks_event) + { + /* Handle the first event. */ + retval= queue_get(thread_group, operation_origin::LISTENER); + mysql_mutex_unlock(&thread_group->mutex); + break; + } + + if(thread_group->active_thread_count==0) + { + /* We added some work items to queue, now wake a worker. */ + if(wake_thread(thread_group, false)) + { + /* + Wake failed, hence groups has no idle threads. Now check if there are + any threads in the group except listener. + */ + if(thread_group->thread_count == 1) + { + /* + Currently there is no worker thread in the group, as indicated by + thread_count == 1 (this means listener is the only one thread in + the group). + The queue is not empty, and listener is not going to handle + events. In order to drain the queue, we create a worker here. + Alternatively, we could just rely on timer to detect stall, and + create thread, but waiting for timer would be an inefficient and + pointless delay. + */ + create_worker(thread_group, false); + } + } + } + mysql_mutex_unlock(&thread_group->mutex); + } + + DBUG_RETURN(retval); +} + +/** + Adjust thread counters in group or global + whenever thread is created or is about to exit + + @param thread_group + @param count - 1, when new thread is created + -1, when thread is about to exit +*/ + +static void add_thread_count(thread_group_t *thread_group, int32 count) +{ + thread_group->thread_count += count; + /* worker starts out and end in "active" state */ + thread_group->active_thread_count += count; + tp_stats.num_worker_threads+= count; +} + + +/** + Creates a new worker thread. + thread_mutex must be held when calling this function + + NOTE: in rare cases, the number of threads can exceed + threadpool_max_threads, because we need at least 2 threads + per group to prevent deadlocks (one listener + one worker) +*/ + +static int create_worker(thread_group_t *thread_group, bool due_to_stall) +{ + pthread_t thread_id; + bool max_threads_reached= false; + int err; + + DBUG_ENTER("create_worker"); + if (tp_stats.num_worker_threads >= threadpool_max_threads + && thread_group->thread_count >= 2) + { + err= 1; + max_threads_reached= true; + goto end; + } + + err= mysql_thread_create(key_worker_thread, &thread_id, + thread_group->pthread_attr, worker_main, thread_group); + if (!err) + { + thread_group->last_thread_creation_time=microsecond_interval_timer(); + statistic_increment(thread_created,&LOCK_status); + add_thread_count(thread_group, 1); + TP_INCREMENT_GROUP_COUNTER(thread_group,thread_creations); + if(due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); + } + } + else + { + my_errno= errno; + } + +end: + if (err) + print_pool_blocked_message(max_threads_reached); + else + pool_block_start= 0; /* Reset pool blocked timer, if it was set */ + + DBUG_RETURN(err); +} + + +/** + Calculate microseconds throttling delay for thread creation. + + The value depends on how many threads are already in the group: + small number of threads means no delay, the more threads the larger + the delay. + + The actual values were not calculated using any scientific methods. + They just look right, and behave well in practice. +*/ + +#define THROTTLING_FACTOR (threadpool_stall_limit/std::max(DEFAULT_THREADPOOL_STALL_LIMIT,threadpool_stall_limit)) + +static ulonglong microsecond_throttling_interval(thread_group_t *thread_group) +{ + int count= thread_group->thread_count; + + if (count < 1+ (int)threadpool_oversubscribe) + return 0; + + if (count < 8) + return 50*1000*THROTTLING_FACTOR; + + if(count < 16) + return 100*1000*THROTTLING_FACTOR; + + return 200*100*THROTTLING_FACTOR; +} + + +/** + Wakes a worker thread, or creates a new one. + + Worker creation is throttled, so we avoid too many threads + to be created during the short time. +*/ +static int wake_or_create_thread(thread_group_t *thread_group, bool due_to_stall) +{ + DBUG_ENTER("wake_or_create_thread"); + + if (thread_group->shutdown) + DBUG_RETURN(0); + + if (wake_thread(thread_group, due_to_stall) == 0) + { + DBUG_RETURN(0); + } + + if (thread_group->thread_count > thread_group->connection_count) + DBUG_RETURN(-1); + + + if (thread_group->active_thread_count == 0) + { + /* + We're better off creating a new thread here with no delay, either there + are no workers at all, or they all are all blocking and there was no + idle thread to wakeup. Smells like a potential deadlock or very slowly + executing requests, e.g sleeps or user locks. + */ + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + ulonglong now = microsecond_interval_timer(); + ulonglong time_since_last_thread_created = + (now - thread_group->last_thread_creation_time); + + /* Throttle thread creation. */ + if (time_since_last_thread_created > + microsecond_throttling_interval(thread_group)) + { + DBUG_RETURN(create_worker(thread_group, due_to_stall)); + } + + TP_INCREMENT_GROUP_COUNTER(thread_group,throttles); + DBUG_RETURN(-1); +} + + + +int thread_group_init(thread_group_t *thread_group, pthread_attr_t* thread_attr) +{ + DBUG_ENTER("thread_group_init"); + thread_group->pthread_attr = thread_attr; + mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); + thread_group->pollfd= INVALID_HANDLE_VALUE; + thread_group->shutdown_pipe[0]= -1; + thread_group->shutdown_pipe[1]= -1; + queue_init(thread_group); + DBUG_RETURN(0); +} + + +void thread_group_destroy(thread_group_t *thread_group) +{ + mysql_mutex_destroy(&thread_group->mutex); + if (thread_group->pollfd != INVALID_HANDLE_VALUE) + { + io_poll_close(thread_group->pollfd); + thread_group->pollfd= INVALID_HANDLE_VALUE; + } +#ifndef HAVE_IOCP + for(int i=0; i < 2; i++) + { + if(thread_group->shutdown_pipe[i] != -1) + { + close(thread_group->shutdown_pipe[i]); + thread_group->shutdown_pipe[i]= -1; + } + } +#endif + + if (!--shutdown_group_count) + { + my_free(all_groups); + all_groups= 0; + } +} + +/** + Wake sleeping thread from waiting list +*/ + +static int wake_thread(thread_group_t *thread_group,bool due_to_stall) +{ + DBUG_ENTER("wake_thread"); + worker_thread_t *thread = thread_group->waiting_threads.front(); + if(thread) + { + thread->woken= true; + thread_group->waiting_threads.remove(thread); + mysql_cond_signal(&thread->cond); + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes); + if (due_to_stall) + { + TP_INCREMENT_GROUP_COUNTER(thread_group, wakes_due_to_stall); + } + DBUG_RETURN(0); + } + DBUG_RETURN(1); /* no thread in waiter list => missed wakeup */ +} + +/* + Wake listener thread (during shutdown) + Self-pipe trick is used in most cases,except IOCP. +*/ +static int wake_listener(thread_group_t *thread_group) +{ +#ifndef HAVE_IOCP + if (pipe(thread_group->shutdown_pipe)) + { + return -1; + } + + /* Wake listener */ + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], NULL, NULL)) + { + return -1; + } + char c= 0; + if (write(thread_group->shutdown_pipe[1], &c, 1) < 0) + return -1; +#else + PostQueuedCompletionStatus(thread_group->pollfd, 0, 0, 0); +#endif + return 0; +} +/** + Initiate shutdown for thread group. + + The shutdown is asynchronous, we only care to wake all threads in here, so + they can finish. We do not wait here until threads terminate. Final cleanup + of the group (thread_group_destroy) will be done by the last exiting threads. +*/ + +static void thread_group_close(thread_group_t *thread_group) +{ + DBUG_ENTER("thread_group_close"); + + mysql_mutex_lock(&thread_group->mutex); + if (thread_group->thread_count == 0) + { + mysql_mutex_unlock(&thread_group->mutex); + thread_group_destroy(thread_group); + DBUG_VOID_RETURN; + } + + thread_group->shutdown= true; + thread_group->listener= NULL; + + wake_listener(thread_group); + + /* Wake all workers. */ + while(wake_thread(thread_group, false) == 0) + { + } + + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_VOID_RETURN; +} + + +/* + Add work to the queue. Maybe wake a worker if they all sleep. + + Currently, this function is only used when new connections need to + perform login (this is done in worker threads). + +*/ + +static void queue_put(thread_group_t *thread_group, TP_connection_generic *connection) +{ + DBUG_ENTER("queue_put"); + + connection->enqueue_time= threadpool_exact_stats?microsecond_interval_timer():pool_timer.current_microtime; + thread_group->queues[connection->priority].push_back(connection); + + if (thread_group->active_thread_count == 0) + wake_or_create_thread(thread_group); + + DBUG_VOID_RETURN; +} + + +/* + Prevent too many threads executing at the same time,if the workload is + not CPU bound. +*/ + +static bool too_many_threads(thread_group_t *thread_group) +{ + return (thread_group->active_thread_count >= 1+(int)threadpool_oversubscribe + && !thread_group->stalled); +} + + +/** + Retrieve a connection with pending event. + + Pending event in our case means that there is either a pending login request + (if connection is not yet logged in), or there are unread bytes on the socket. + + If there are no pending events currently, thread will wait. + If timeout specified in abstime parameter passes, the function returns NULL. + + @param current_thread - current worker thread + @param thread_group - current thread group + @param abstime - absolute wait timeout + + @return + connection with pending event. + NULL is returned if timeout has expired,or on shutdown. +*/ + +TP_connection_generic *get_event(worker_thread_t *current_thread, + thread_group_t *thread_group, struct timespec *abstime) +{ + DBUG_ENTER("get_event"); + TP_connection_generic *connection = NULL; + + + mysql_mutex_lock(&thread_group->mutex); + DBUG_ASSERT(thread_group->active_thread_count >= 0); + + for(;;) + { + int err=0; + bool oversubscribed = too_many_threads(thread_group); + if (thread_group->shutdown) + break; + + /* Check if queue is not empty */ + if (!oversubscribed) + { + connection = queue_get(thread_group, operation_origin::WORKER); + if(connection) + { + break; + } + } + + /* If there is currently no listener in the group, become one. */ + if(!thread_group->listener) + { + thread_group->listener= current_thread; + thread_group->active_thread_count--; + mysql_mutex_unlock(&thread_group->mutex); + + connection = listener(current_thread, thread_group); + + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + /* There is no listener anymore, it just returned. */ + thread_group->listener= NULL; + break; + } + + + /* + Last thing we try before going to sleep is to + non-blocking event poll, i.e with timeout = 0. + If this returns events, pick one + */ + if (!oversubscribed) + { + native_event ev[MAX_EVENTS]; + int cnt = io_poll_wait(thread_group->pollfd, ev, MAX_EVENTS, 0); + TP_INCREMENT_GROUP_COUNTER(thread_group, polls[(int)operation_origin::WORKER]); + if (cnt > 0) + { + queue_put(thread_group, ev, cnt); + connection= queue_get(thread_group,operation_origin::WORKER); + break; + } + } + + + /* And now, finally sleep */ + current_thread->woken = false; /* wake() sets this to true */ + + /* + Add current thread to the head of the waiting list and wait. + It is important to add thread to the head rather than tail + as it ensures LIFO wakeup order (hot caches, working inactivity timeout) + */ + thread_group->waiting_threads.push_front(current_thread); + + thread_group->active_thread_count--; + if (abstime) + { + err = mysql_cond_timedwait(¤t_thread->cond, &thread_group->mutex, + abstime); + } + else + { + err = mysql_cond_wait(¤t_thread->cond, &thread_group->mutex); + } + thread_group->active_thread_count++; + + if (!current_thread->woken) + { + /* + Thread was not signalled by wake(), it might be a spurious wakeup or + a timeout. Anyhow, we need to remove ourselves from the list now. + If thread was explicitly woken, than caller removed us from the list. + */ + thread_group->waiting_threads.remove(current_thread); + } + + if (err) + break; + } + + thread_group->stalled= false; + + mysql_mutex_unlock(&thread_group->mutex); + + DBUG_RETURN(connection); +} + + + +/** + Tells the pool that worker starts waiting on IO, lock, condition, + sleep() or similar. +*/ + +void wait_begin(thread_group_t *thread_group) +{ + DBUG_ENTER("wait_begin"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count--; + + DBUG_ASSERT(thread_group->active_thread_count >=0); + DBUG_ASSERT(thread_group->connection_count > 0); + + if ((thread_group->active_thread_count == 0) && + (!is_queue_empty(thread_group) || !thread_group->listener)) + { + /* + Group might stall while this thread waits, thus wake + or create a worker to prevent stall. + */ + wake_or_create_thread(thread_group); + } + + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + +/** + Tells the pool has finished waiting. +*/ + +void wait_end(thread_group_t *thread_group) +{ + DBUG_ENTER("wait_end"); + mysql_mutex_lock(&thread_group->mutex); + thread_group->active_thread_count++; + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + + +TP_connection * TP_pool_generic::new_connection(CONNECT *c) +{ + return new (std::nothrow) TP_connection_generic(c); +} + +/** + Add a new connection to thread pool +*/ + +void TP_pool_generic::add(TP_connection *c) +{ + DBUG_ENTER("tp_add_connection"); + + TP_connection_generic *connection=(TP_connection_generic *)c; + thread_group_t *thread_group= connection->thread_group; + /* + Add connection to the work queue.Actual logon + will be done by a worker thread. + */ + mysql_mutex_lock(&thread_group->mutex); + queue_put(thread_group, connection); + mysql_mutex_unlock(&thread_group->mutex); + DBUG_VOID_RETURN; +} + + + +/** + MySQL scheduler callback: wait begin +*/ + +void TP_connection_generic::wait_begin(int type) +{ + DBUG_ENTER("wait_begin"); + + DBUG_ASSERT(!waiting); + waiting++; + if (waiting == 1) + ::wait_begin(thread_group); + DBUG_VOID_RETURN; +} + + +/** + MySQL scheduler callback: wait end +*/ + +void TP_connection_generic::wait_end() +{ + DBUG_ENTER("wait_end"); + DBUG_ASSERT(waiting); + waiting--; + if (waiting == 0) + ::wait_end(thread_group); + DBUG_VOID_RETURN; +} + + +static void set_next_timeout_check(ulonglong abstime) +{ + auto old= pool_timer.next_timeout_check.load(std::memory_order_relaxed); + DBUG_ENTER("set_next_timeout_check"); + while (abstime < old) + { + if (pool_timer.next_timeout_check. + compare_exchange_weak(old, abstime, + std::memory_order_relaxed, + std::memory_order_relaxed)) + break; + } + DBUG_VOID_RETURN; +} + +static size_t get_group_id(my_thread_id tid) +{ + return size_t(tid % group_count); +} + + +TP_connection_generic::TP_connection_generic(CONNECT *c): + TP_connection(c), + thread_group(0), + next_in_queue(0), + prev_in_queue(0), + abs_wait_timeout(ULONGLONG_MAX), + bound_to_poll_descriptor(false), + waiting(false), + fix_group(false) +#ifdef HAVE_IOCP +, overlapped() +#endif +#ifdef _WIN32 +, vio_type(c->vio_type) +#endif +{ + DBUG_ASSERT(c->vio_type != VIO_CLOSED); + +#ifdef _WIN32 + fd= (c->vio_type == VIO_TYPE_NAMEDPIPE) ? + c->pipe: (TP_file_handle) mysql_socket_getfd(c->sock); +#else + fd= mysql_socket_getfd(c->sock); +#endif + + /* Assign connection to a group. */ + thread_group_t *group= + &all_groups[get_group_id(c->thread_id)]; + thread_group=group; + + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); +} + +TP_connection_generic::~TP_connection_generic() +{ + mysql_mutex_lock(&thread_group->mutex); + thread_group->connection_count--; + mysql_mutex_unlock(&thread_group->mutex); +} + +/** + Set wait timeout for connection. +*/ + +void TP_connection_generic::set_io_timeout(int timeout_sec) +{ + DBUG_ENTER("set_wait_timeout"); + /* + Calculate wait deadline for this connection. + Instead of using microsecond_interval_timer() which has a syscall + overhead, use pool_timer.current_microtime and take + into account that its value could be off by at most + one tick interval. + */ + + abs_wait_timeout= pool_timer.current_microtime + + 1000LL*pool_timer.tick_interval + + 1000000LL*timeout_sec; + + set_next_timeout_check(abs_wait_timeout); + DBUG_VOID_RETURN; +} + + +/** + Handle a (rare) special case,where connection needs to + migrate to a different group because group_count has changed + after thread_pool_size setting. +*/ + +static int change_group(TP_connection_generic *c, + thread_group_t *old_group, + thread_group_t *new_group) +{ + int ret= 0; + + DBUG_ASSERT(c->thread_group == old_group); + + /* Remove connection from the old group. */ + mysql_mutex_lock(&old_group->mutex); + if (c->bound_to_poll_descriptor) + { + io_poll_disassociate_fd(old_group->pollfd,c->fd); + c->bound_to_poll_descriptor= false; + } + c->thread_group->connection_count--; + mysql_mutex_unlock(&old_group->mutex); + + /* Add connection to the new group. */ + mysql_mutex_lock(&new_group->mutex); + c->thread_group= new_group; + new_group->connection_count++; + /* Ensure that there is a listener in the new group. */ + if (!new_group->thread_count) + ret= create_worker(new_group, false); + mysql_mutex_unlock(&new_group->mutex); + return ret; +} + + +int TP_connection_generic::start_io() +{ + /* + Usually, connection will stay in the same group for the entire + connection's life. However, we do allow group_count to + change at runtime, which means in rare cases when it changes is + connection should need to migrate to another group, this ensures + to ensure equal load between groups. + + So we recalculate in which group the connection should be, based + on thread_id and current group count, and migrate if necessary. + */ + if (fix_group) + { + fix_group = false; + thread_group_t *new_group= &all_groups[get_group_id(thd->thread_id)]; + + if (new_group != thread_group) + { + if (change_group(this, thread_group, new_group)) + return -1; + } + } + + /* + Bind to poll descriptor if not yet done. + */ + if (!bound_to_poll_descriptor) + { + bound_to_poll_descriptor= true; + return io_poll_associate_fd(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); + } + + return io_poll_start_read(thread_group->pollfd, fd, this, OPTIONAL_IO_POLL_READ_PARAM); +} + + + +/** + Worker thread's main +*/ + +static void *worker_main(void *param) +{ + + worker_thread_t this_thread; + pthread_detach_this_thread(); + my_thread_init(); + + DBUG_ENTER("worker_main"); + + thread_group_t *thread_group = (thread_group_t *)param; + + /* Init per-thread structure */ + mysql_cond_init(key_worker_cond, &this_thread.cond, NULL); + this_thread.thread_group= thread_group; + this_thread.event_count=0; + + /* Run event loop */ + for(;;) + { + TP_connection_generic *connection; + struct timespec ts; + set_timespec(ts,threadpool_idle_timeout); + connection = get_event(&this_thread, thread_group, &ts); + if (!connection) + break; + this_thread.event_count++; + tp_callback(connection); + } + + /* Thread shutdown: cleanup per-worker-thread structure. */ + mysql_cond_destroy(&this_thread.cond); + + bool last_thread; /* last thread in group exits */ + mysql_mutex_lock(&thread_group->mutex); + add_thread_count(thread_group, -1); + last_thread= ((thread_group->thread_count == 0) && thread_group->shutdown); + mysql_mutex_unlock(&thread_group->mutex); + + /* Last thread in group exits and pool is terminating, destroy group.*/ + if (last_thread) + thread_group_destroy(thread_group); + + my_thread_end(); + return NULL; +} + + +TP_pool_generic::TP_pool_generic() +{} + +int TP_pool_generic::init() +{ + DBUG_ENTER("TP_pool_generic::TP_pool_generic"); + threadpool_max_size= MY_MAX(threadpool_size, 128); + all_groups= (thread_group_t *) + my_malloc(PSI_INSTRUMENT_ME, + sizeof(thread_group_t) * threadpool_max_size, MYF(MY_WME|MY_ZEROFILL)); + if (!all_groups) + { + threadpool_max_size= 0; + sql_print_error("Allocation failed"); + DBUG_RETURN(-1); + } + PSI_register(mutex); + PSI_register(cond); + PSI_register(thread); + scheduler_init(); + threadpool_started= true; + for (uint i= 0; i < threadpool_max_size; i++) + { + thread_group_init(&all_groups[i], get_connection_attrib()); + } + set_pool_size(threadpool_size); + if(group_count == 0) + { + /* Something went wrong */ + sql_print_error("Can't set threadpool size to %d",threadpool_size); + DBUG_RETURN(-1); + } + pool_timer.tick_interval= threadpool_stall_limit; + start_timer(&pool_timer); + DBUG_RETURN(0); +} + +TP_pool_generic::~TP_pool_generic() +{ + DBUG_ENTER("tp_end"); + + if (!threadpool_started) + DBUG_VOID_RETURN; + + stop_timer(&pool_timer); + shutdown_group_count= threadpool_max_size; + for (uint i= 0; i < threadpool_max_size; i++) + { + thread_group_close(&all_groups[i]); + } + + /* + Wait until memory occupied by all_groups is freed. + */ + int timeout_ms=5000; + while(all_groups && timeout_ms--) + my_sleep(1000); + + threadpool_started= false; + DBUG_VOID_RETURN; +} + + +static my_bool thd_reset_group(THD* thd, void*) +{ + auto c= (TP_connection_generic*)thd->event_scheduler.data; + if(c) + c->fix_group= true; + return FALSE; +} + +/** Ensure that poll descriptors are created when threadpool_size changes */ +int TP_pool_generic::set_pool_size(uint size) +{ + bool success= true; + + for(uint i=0; i< size; i++) + { + thread_group_t *group= &all_groups[i]; + mysql_mutex_lock(&group->mutex); + if (group->pollfd == INVALID_HANDLE_VALUE) + { + group->pollfd= io_poll_create(); + success= (group->pollfd != INVALID_HANDLE_VALUE); + if(!success) + { + sql_print_error("io_poll_create() failed, errno=%d", errno); + } + } + mysql_mutex_unlock(&group->mutex); + if (!success) + { + group_count= i; + return -1; + } + } + group_count= size; + server_threads.iterate(thd_reset_group); + return 0; +} + +int TP_pool_generic::set_stall_limit(uint limit) +{ + mysql_mutex_lock(&(pool_timer.mutex)); + pool_timer.tick_interval= limit; + mysql_mutex_unlock(&(pool_timer.mutex)); + mysql_cond_signal(&(pool_timer.cond)); + return 0; +} + + +/** + Calculate number of idle/waiting threads in the pool. + + Sum idle threads over all groups. + Don't do any locking, it is not required for stats. +*/ + +int TP_pool_generic::get_idle_thread_count() +{ + int sum=0; + for (uint i= 0; i < threadpool_max_size && all_groups[i].pollfd != INVALID_HANDLE_VALUE; i++) + { + sum+= (all_groups[i].thread_count - all_groups[i].active_thread_count); + } + return sum; +} + + +/* Report threadpool problems */ + +/** + Delay in microseconds, after which "pool blocked" message is printed. + (30 sec == 30 Mio usec) +*/ +#define BLOCK_MSG_DELAY (30*1000000) + +#define MAX_THREADS_REACHED_MSG \ +"Threadpool could not create additional thread to handle queries, because the \ +number of allowed threads was reached. Increasing 'thread_pool_max_threads' \ +parameter can help in this situation.\n \ +If 'extra_port' parameter is set, you can still connect to the database with \ +superuser account (it must be TCP connection using extra_port as TCP port) \ +and troubleshoot the situation. \ +A likely cause of pool blocks are clients that lock resources for long time. \ +'show processlist' or 'show engine innodb status' can give additional hints." + +#define CREATE_THREAD_ERROR_MSG "Can't create threads in threadpool (errno=%d)." + +/** + Write a message when blocking situation in threadpool occurs. + The message is written only when pool blocks for BLOCK_MSG_DELAY (30) seconds. + It will be just a single message for each blocking situation (to prevent + log flood). +*/ + +static void print_pool_blocked_message(bool max_threads_reached) +{ + ulonglong now; + static bool msg_written; + + now= microsecond_interval_timer(); + if (pool_block_start == 0) + { + pool_block_start= now; + msg_written = false; + return; + } + + if (now > pool_block_start + BLOCK_MSG_DELAY && !msg_written) + { + if (max_threads_reached) + sql_print_error(MAX_THREADS_REACHED_MSG); + else + sql_print_error(CREATE_THREAD_ERROR_MSG, my_errno); + + sql_print_information("Threadpool has been blocked for %u seconds\n", + (uint)((now- pool_block_start)/1000000)); + /* avoid reperated messages for the same blocking situation */ + msg_written= true; + } +} + +#endif /* HAVE_POOL_OF_THREADS */ |