summaryrefslogtreecommitdiffstats
path: root/sql/threadpool_generic.h
diff options
context:
space:
mode:
Diffstat (limited to 'sql/threadpool_generic.h')
-rw-r--r--sql/threadpool_generic.h157
1 files changed, 157 insertions, 0 deletions
diff --git a/sql/threadpool_generic.h b/sql/threadpool_generic.h
new file mode 100644
index 00000000..b7a35b7c
--- /dev/null
+++ b/sql/threadpool_generic.h
@@ -0,0 +1,157 @@
+/* Copyright(C) 2019, 2020, MariaDB
+ *
+ * This program is free software; you can redistribute itand /or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#if defined (HAVE_POOL_OF_THREADS)
+#include <my_global.h>
+#include <sql_plist.h>
+#include <my_pthread.h>
+#include <mysqld.h>
+#include <threadpool.h>
+#include <violite.h>
+
+#ifdef _WIN32
+#include <windows.h>
+#include "threadpool_winsockets.h"
+/* AIX may define this, too ?*/
+#define HAVE_IOCP
+#endif
+
+
+#ifdef _WIN32
+typedef HANDLE TP_file_handle;
+#else
+typedef int TP_file_handle;
+#define INVALID_HANDLE_VALUE -1
+#endif
+
+#ifdef __linux__
+#include <sys/epoll.h>
+typedef struct epoll_event native_event;
+#elif defined(HAVE_KQUEUE)
+#include <sys/event.h>
+typedef struct kevent native_event;
+#elif defined (__sun)
+#include <port.h>
+typedef port_event_t native_event;
+#elif defined (HAVE_IOCP)
+typedef OVERLAPPED_ENTRY native_event;
+#else
+#error threadpool is not available on this platform
+#endif
+
+struct thread_group_t;
+
+/* Per-thread structure for workers */
+struct worker_thread_t
+{
+ ulonglong event_count; /* number of request handled by this thread */
+ thread_group_t* thread_group;
+ worker_thread_t* next_in_list;
+ worker_thread_t** prev_in_list;
+ mysql_cond_t cond;
+ bool woken;
+};
+
+typedef I_P_List<worker_thread_t, I_P_List_adapter<worker_thread_t,
+ & worker_thread_t::next_in_list,
+ & worker_thread_t::prev_in_list>,
+ I_P_List_counter
+>
+worker_list_t;
+
+struct TP_connection_generic :public TP_connection
+{
+ TP_connection_generic(CONNECT* c);
+ ~TP_connection_generic();
+
+ int init() override { return 0; }
+ void set_io_timeout(int sec) override;
+ int start_io() override;
+ void wait_begin(int type) override;
+ void wait_end() override;
+
+ thread_group_t* thread_group;
+ TP_connection_generic* next_in_queue;
+ TP_connection_generic** prev_in_queue;
+ ulonglong abs_wait_timeout;
+ ulonglong enqueue_time;
+ TP_file_handle fd;
+ bool bound_to_poll_descriptor;
+ int waiting;
+ bool fix_group;
+#ifdef _WIN32
+ win_aiosocket win_sock{};
+ void init_vio(st_vio *vio) override
+ { win_sock.init(vio);}
+#endif
+
+};
+
+
+typedef I_P_List<TP_connection_generic,
+ I_P_List_adapter<TP_connection_generic,
+ & TP_connection_generic::next_in_queue,
+ & TP_connection_generic::prev_in_queue>,
+ I_P_List_counter,
+ I_P_List_fast_push_back<TP_connection_generic> >
+ connection_queue_t;
+
+const int NQUEUES = 2; /* We have high and low priority queues*/
+
+enum class operation_origin
+{
+ WORKER,
+ LISTENER
+};
+
+struct thread_group_counters_t
+{
+ ulonglong thread_creations;
+ ulonglong thread_creations_due_to_stall;
+ ulonglong wakes;
+ ulonglong wakes_due_to_stall;
+ ulonglong throttles;
+ ulonglong stalls;
+ ulonglong dequeues[2];
+ ulonglong polls[2];
+};
+
+struct thread_group_t
+{
+ mysql_mutex_t mutex;
+ connection_queue_t queues[NQUEUES];
+ worker_list_t waiting_threads;
+ worker_thread_t* listener;
+ pthread_attr_t* pthread_attr;
+ TP_file_handle pollfd;
+ int thread_count;
+ int active_thread_count;
+ int connection_count;
+ /* Stats for the deadlock detection timer routine.*/
+ int io_event_count;
+ int queue_event_count;
+ ulonglong last_thread_creation_time;
+ int shutdown_pipe[2];
+ bool shutdown;
+ bool stalled;
+ thread_group_counters_t counters;
+ char pad[CPU_LEVEL1_DCACHE_LINESIZE];
+};
+
+#define TP_INCREMENT_GROUP_COUNTER(group,var) do {group->counters.var++;}while(0)
+
+extern thread_group_t* all_groups;
+#endif
+