diff options
Diffstat (limited to '')
-rw-r--r-- | tpool/tpool.h | 302 |
1 files changed, 302 insertions, 0 deletions
diff --git a/tpool/tpool.h b/tpool/tpool.h new file mode 100644 index 00000000..2f87bcd4 --- /dev/null +++ b/tpool/tpool.h @@ -0,0 +1,302 @@ +/* Copyright (C) 2019, 2021, MariaDB Corporation. + +This program is free software; you can redistribute itand /or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/ + +#pragma once +#include <memory> /* unique_ptr */ +#include <condition_variable> +#include <mutex> +#include <atomic> +#include <tpool_structs.h> +#ifdef LINUX_NATIVE_AIO +#include <libaio.h> +#endif +#ifdef HAVE_URING +#include <sys/uio.h> +#endif +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include <windows.h> +/** + Windows-specific native file handle struct. + Apart from the actual handle, contains PTP_IO + used by the Windows threadpool. +*/ +struct native_file_handle +{ + HANDLE m_handle; + PTP_IO m_ptp_io; + native_file_handle(){}; + native_file_handle(HANDLE h) : m_handle(h), m_ptp_io() {} + operator HANDLE() const { return m_handle; } +}; +#else +#include <unistd.h> +typedef int native_file_handle; +#endif + +namespace tpool +{ +/** + Task callback function + */ +typedef void (*callback_func)(void *); +typedef void (*callback_func_np)(void); +class task; + +/** A class that can be used e.g. for +restricting concurrency for specific class of tasks. */ + +class task_group +{ +private: + circular_queue<task*> m_queue; + std::mutex m_mtx; + std::condition_variable m_cv; + unsigned int m_tasks_running; + unsigned int m_max_concurrent_tasks; + const bool m_enable_task_release; + +public: + task_group(unsigned int max_concurrency= 100000, bool m_enable_task_release= true); + void set_max_tasks(unsigned int max_concurrent_tasks); + void execute(task* t); + void cancel_pending(task *t); + ~task_group(); +}; + + +class task +{ +public: + callback_func m_func; + void *m_arg; + task_group* m_group; + virtual void add_ref() {}; + virtual void release() {}; + task() {}; + task(callback_func func, void* arg, task_group* group = nullptr); + void* get_arg() { return m_arg; } + callback_func get_func() { return m_func; } + virtual void execute(); + virtual ~task() {} +}; + +class waitable_task :public task +{ + std::mutex m_mtx; + std::condition_variable m_cv; + int m_ref_count; + int m_waiter_count; + callback_func m_original_func; + void wait(std::unique_lock<std::mutex>&lk); +public: + waitable_task(callback_func func, void* arg, task_group* group = nullptr); + void add_ref() override; + void release() override; + TPOOL_SUPPRESS_TSAN bool is_running() { return get_ref_count() > 0; } + TPOOL_SUPPRESS_TSAN int get_ref_count() {return m_ref_count;} + void wait(); + void disable(); + void enable(); + virtual ~waitable_task() {}; +}; +enum class aio_opcode +{ + AIO_PREAD, + AIO_PWRITE +}; +constexpr size_t MAX_AIO_USERDATA_LEN= 4 * sizeof(void*); + +/** IO control block, includes parameters for the IO, and the callback*/ + +struct aiocb +#ifdef _WIN32 + :OVERLAPPED +#elif defined LINUX_NATIVE_AIO + :iocb +#elif defined HAVE_URING + :iovec +#endif +{ + native_file_handle m_fh; + aio_opcode m_opcode; + unsigned long long m_offset; + void *m_buffer; + unsigned int m_len; + callback_func m_callback; + task_group* m_group; + /* Returned length and error code*/ + size_t m_ret_len; + int m_err; + void *m_internal; + task m_internal_task; + alignas(8) char m_userdata[MAX_AIO_USERDATA_LEN]; + + aiocb() : m_internal_task(nullptr, nullptr) + {} + void execute_callback() + { + task t(m_callback, this,m_group); + t.execute(); + } +}; + + +/** + AIO interface +*/ +class aio +{ +public: + /** + Submit asynchronous IO. + On completion, cb->m_callback is executed. + */ + virtual int submit_io(aiocb *cb)= 0; + /** "Bind" file to AIO handler (used on Windows only) */ + virtual int bind(native_file_handle &fd)= 0; + /** "Unind" file to AIO handler (used on Windows only) */ + virtual int unbind(const native_file_handle &fd)= 0; + virtual ~aio(){}; +protected: + static void synchronous(aiocb *cb); + /** finish a partial read/write callback synchronously */ + static inline void finish_synchronous(aiocb *cb) + { + if (!cb->m_err && cb->m_ret_len != cb->m_len) + { + /* partial read/write */ + cb->m_buffer= (char *) cb->m_buffer + cb->m_ret_len; + cb->m_len-= (unsigned int) cb->m_ret_len; + cb->m_offset+= cb->m_ret_len; + synchronous(cb); + } + } +}; + +class timer +{ +public: + virtual void set_time(int initial_delay_ms, int period_ms) = 0; + virtual void disarm() = 0; + virtual ~timer(){} +}; + +class thread_pool; + +extern aio *create_simulated_aio(thread_pool *tp); + +class thread_pool +{ +protected: + /* AIO handler */ + std::unique_ptr<aio> m_aio; + virtual aio *create_native_aio(int max_io)= 0; + + /** + Functions to be called at worker thread start/end + can be used for example to set some TLS variables + */ + void (*m_worker_init_callback)(void); + void (*m_worker_destroy_callback)(void); + +public: + thread_pool() : m_aio(), m_worker_init_callback(), m_worker_destroy_callback() + { + } + virtual void submit_task(task *t)= 0; + virtual timer* create_timer(callback_func func, void *data=nullptr) = 0; + void set_thread_callbacks(void (*init)(), void (*destroy)()) + { + m_worker_init_callback= init; + m_worker_destroy_callback= destroy; + } + int configure_aio(bool use_native_aio, int max_io) + { + if (use_native_aio) + m_aio.reset(create_native_aio(max_io)); + else + m_aio.reset(create_simulated_aio(this)); + return !m_aio ? -1 : 0; + } + + int reconfigure_aio(bool use_native_aio, int max_io) + { + assert(m_aio); + if (use_native_aio) + { + auto new_aio = create_native_aio(max_io); + if (!new_aio) + return -1; + m_aio.reset(new_aio); + } + return 0; + } + + void disable_aio() + { + m_aio.reset(); + } + + /** + Tweaks how fast worker threads are created, or how often they are signaled. + + @param threads - desired number of concurrently active threads + Special value 0 means default. Not the same as max number of threads + in the pool - oversubscription is allowed and stalls are still detected + + @note + It is designed to use with "batch" operations, where huge number + of tasks is submitted in rapid succession. In this case, it is + better to temporarily restrict concurrency, which will make thread + creation throttling more aggressive. + Once the batch is over, restore default concurrency + by calling set_concurrency(0). + */ + virtual void set_concurrency(unsigned int threads=0){} + + int bind(native_file_handle &fd) { return m_aio->bind(fd); } + void unbind(const native_file_handle &fd) { if (m_aio) m_aio->unbind(fd); } + int submit_io(aiocb *cb) { return m_aio->submit_io(cb); } + virtual void wait_begin() {}; + virtual void wait_end() {}; + virtual ~thread_pool() {} +}; +const int DEFAULT_MIN_POOL_THREADS= 1; +const int DEFAULT_MAX_POOL_THREADS= 500; +extern thread_pool * +create_thread_pool_generic(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); +extern "C" void tpool_wait_begin(); +extern "C" void tpool_wait_end(); +#ifdef _WIN32 +extern thread_pool * +create_thread_pool_win(int min_threads= DEFAULT_MIN_POOL_THREADS, + int max_threads= DEFAULT_MAX_POOL_THREADS); + +/* + Helper functions, to execute pread/pwrite even if file is + opened with FILE_FLAG_OVERLAPPED, and bound to completion + port. +*/ +SSIZE_T pwrite(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +SSIZE_T pread(const native_file_handle &h, void *buf, size_t count, + unsigned long long offset); +HANDLE win_get_syncio_event(); +#endif +} // namespace tpool |