/* * Copyright (C) 2019-2023 Codership Oy * * This file is part of wsrep-lib. * * Wsrep-lib is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 2 of the License, or * (at your option) any later version. * * Wsrep-lib is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with wsrep-lib. If not, see . */ #include "db_threads.hpp" #include "wsrep/compiler.hpp" #include "wsrep/logger.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern "C" { static void* start_thread(void* args_ptr); } namespace { struct ti_obj { }; enum ti_opcode { oc_thread_create, oc_thread_destroy, oc_mutex_create, oc_mutex_destroy, oc_mutex_lock, oc_mutex_trylock, oc_mutex_unlock, oc_cond_create, oc_cond_destroy, oc_cond_wait, oc_cond_timedwait, oc_cond_signal, oc_cond_broadcast, oc_max // must be the last }; static const char* ti_opstring(enum ti_opcode op) { switch (op) { case oc_thread_create: return "thread_create"; case oc_thread_destroy: return "thread_destroy"; case oc_mutex_create: return "mutex_create"; case oc_mutex_destroy: return "mutex_destroy"; case oc_mutex_lock: return "mutex_lock"; case oc_mutex_trylock: return "mutex_trylock"; case oc_mutex_unlock: return "mutex_unlock"; case oc_cond_create: return "cond_create"; case oc_cond_destroy: return "cond_destroy"; case oc_cond_wait: return "cond_wait"; case oc_cond_timedwait: return "cond_timedwait"; case oc_cond_signal: return "cond_signal"; case oc_cond_broadcast: return "cond_broadcast"; default: return "unknown"; } } static std::vector key_vec; static std::atomic key_cnt; static std::vector> ops_map; static std::vector ops_map_sync; static struct ops_map_sync_deleter { ~ops_map_sync_deleter() { std::for_each(ops_map_sync.begin(), ops_map_sync.end(), [](auto entry) { delete entry; }); } } ops_map_sync_deleter; static std::array, oc_max> total_ops; static std::atomic total_allocations; static std::atomic mutex_contention; static std::unordered_map mutex_contention_counts; static int op_level; // Check correct condition variable usage: // - Associated mutex must be locked when waiting for cond // - There must be at least one waiter when signalling for condition static bool cond_checks; static inline void cond_check(bool condition, const char* name, const char* message) { if (cond_checks && !condition) { wsrep::log_error() << "Condition variable check failed for '" << name << "': " << message; ::abort(); } } static inline int append_key(const char* name, const char* type) { key_vec.push_back(std::string(name) + "_" + type); ops_map.push_back(std::vector()); ops_map_sync.push_back(new std::mutex()); ops_map.back().resize(oc_max); return ++key_cnt; } template static inline size_t get_key_index(const Key* key) { size_t index(reinterpret_cast(key) - 1); assert(index < key_vec.size()); return index; } template static inline const char* get_key_name(const Key* key) { return key_vec[get_key_index(key)].c_str(); } static inline const std::string& get_key_name_by_index(size_t index) { assert(index < key_vec.size()); return key_vec[index]; } // Note: Do not refer the obj pointer in this function, it may // have been deleted before the call. template static inline void update_ops(const ti_obj* obj, const Key* key, enum ti_opcode op) { if (op_level < 1) return; total_ops[op] += 1; if (op_level < 2) return; if (false && op == oc_mutex_destroy) { wsrep::log_info() << "thread: " << std::this_thread::get_id() << " object: " << obj << ": name: " << get_key_name(key) << " op: " << ti_opstring(op); } std::lock_guard lock(*ops_map_sync[get_key_index(key)]); ops_map[get_key_index(key)][op] += 1; } struct thread_args { void* this_thread; void* (*fn)(void*); void* args; }; class ti_thread : public ti_obj { public: ti_thread(const wsrep::thread_service::thread_key* key) : key_(key) , th_() , retval_() , detached_() { update_ops(this, key_, oc_thread_create); } ~ti_thread() { update_ops(this, key_, oc_thread_destroy); } ti_thread(const ti_thread&) = delete; ti_thread& operator=(const ti_thread&) = delete; int run(void* (*fn)(void *), void* args) { auto ta(new thread_args{this, fn, args}); return pthread_create(&th_, nullptr, start_thread, ta); } int detach() { detached_ = true; return pthread_detach(th_); } int join(void** retval) { return pthread_join(th_, retval); } bool detached() const { return detached_; } void retval(void* retval) { retval_ = retval; } static ti_thread* self(); int setschedparam(int policy, const struct sched_param* param) { return pthread_setschedparam(th_, policy, param); } int getschedparam(int* policy, struct sched_param* param) { return pthread_getschedparam(th_, policy, param); } int equal(ti_thread* other) { return pthread_equal(th_, other->th_); } private: const wsrep::thread_service::thread_key* key_; pthread_t th_; void* retval_; bool detached_; }; thread_local ti_thread* this_ti_thread = nullptr; static bool main_thread_initializer() { const auto* main_thread_key = reinterpret_cast( append_key("main", "thread")); static ti_thread main_thread(main_thread_key); this_ti_thread = &main_thread; return true; } static bool main_thread_init = main_thread_initializer(); ti_thread* ti_thread::self() { return this_ti_thread; } class ti_mutex : public ti_obj { public: ti_mutex(const wsrep::thread_service::mutex_key* key, bool inplace) : mutex_(PTHREAD_MUTEX_INITIALIZER) , key_(key) , inplace_(inplace) #ifndef NDEBUG , locked_() , owner_() #endif // ! NDEBUG { update_ops(this, key_, oc_mutex_create); if (not inplace) total_allocations++; } ~ti_mutex() { update_ops(this, key_, oc_mutex_destroy); } ti_mutex& operator=(const ti_mutex&) = delete; ti_mutex(const ti_mutex&) = delete; int lock() { update_ops(this, key_, oc_mutex_lock); int ret(pthread_mutex_trylock(&mutex_)); if (ret == EBUSY) { mutex_contention++; { std::lock_guard lock(*ops_map_sync[get_key_index(key_)]); mutex_contention_counts[get_key_name(key_)] += 1; } ret = pthread_mutex_lock(&mutex_); } #ifndef NDEBUG if (ret == 0) { assert(owner_ == std::thread::id()); locked_ = true; owner_ = std::this_thread::get_id(); } #endif // ! NDEBUG return ret; } int trylock() { update_ops(this, key_, oc_mutex_trylock); int ret(pthread_mutex_trylock(&mutex_)); #ifndef NDEBUG if (ret == 0) { assert(owner_ == std::thread::id()); locked_ = true; owner_ = std::this_thread::get_id(); } #endif // ! NDEBUG return ret; } int unlock() { assert(locked_); #ifndef NDEBUG assert(owner_ == std::this_thread::get_id()); owner_ = std::thread::id(); #endif // ! NDEBUG // Use temporary object. After mutex is unlocked it may be // destroyed before this update_ops() finishes. auto key(key_); int ret(pthread_mutex_unlock(&mutex_)); update_ops(this, key, oc_mutex_unlock); return ret; } struct condwait_context { #ifndef NDEBUG bool locked; std::thread::id owner; #endif // ! NDEBUG }; condwait_context save_for_condwait() { #ifndef NDEBUG return condwait_context{ locked_, owner_ }; #else return condwait_context{}; #endif // ! NDEBUG } void reset() { #ifndef NDEBUG locked_ = false; owner_ = std::thread::id(); #endif // ! NDEBUG } void restore_from_condwait(const condwait_context& ctx WSREP_UNUSED) { #ifndef NDEBUG locked_ = ctx.locked; owner_ = ctx.owner; #endif // ! NDEBUG } pthread_mutex_t* native_handle() { return &mutex_; } const wsrep::thread_service::mutex_key* key() const { return key_; } bool inplace() const { return inplace_; } private: pthread_mutex_t mutex_; const wsrep::thread_service::mutex_key* key_; const bool inplace_; #ifndef NDEBUG bool locked_; std::atomic owner_; #endif // ! NDEBU }; class ti_cond : public ti_obj { public: ti_cond(const wsrep::thread_service::cond_key* key, bool inplace) : cond_(PTHREAD_COND_INITIALIZER) , key_(key) , inplace_(inplace) , waiter_() { update_ops(this, key_, oc_cond_create); if (not inplace) total_allocations++; } ~ti_cond() { update_ops(this, key_, oc_cond_destroy); } ti_cond& operator=(const ti_cond&) = delete; ti_cond(const ti_cond&) = delete; int wait(ti_mutex& mutex) { cond_check(pthread_mutex_trylock(mutex.native_handle()), get_key_name(key_), "Mutex not locked in cond wait"); waiter_ = true; update_ops(this, key_, oc_cond_wait); // update_ops(&mutex, mutex.key(), oc_mutex_unlock); auto condwait_ctx(mutex.save_for_condwait()); mutex.reset(); int ret(pthread_cond_wait(&cond_, mutex.native_handle())); // update_ops(&mutex, mutex.key(), oc_mutex_lock); mutex.restore_from_condwait(condwait_ctx); waiter_ = false; return ret; } int timedwait(ti_mutex& mutex, const struct timespec* ts) { cond_check(pthread_mutex_trylock(mutex.native_handle()), get_key_name(key_), "Mutex not locked in cond wait"); waiter_ = true; update_ops(this, key_, oc_cond_timedwait); // update_ops(&mutex, mutex.key(), oc_mutex_unlock); auto condwait_ctx(mutex.save_for_condwait()); mutex.reset(); int ret(pthread_cond_timedwait(&cond_, mutex.native_handle(), ts)); // update_ops(&mutex, mutex.key(), oc_mutex_lock); mutex.restore_from_condwait(condwait_ctx); waiter_ = false; return ret; } int signal() { update_ops(this, key_, oc_cond_signal); cond_check(waiter_, get_key_name(key_), "Signalling condition variable without waiter"); return pthread_cond_signal(&cond_); } int broadcast() { update_ops(this, key_, oc_cond_broadcast); return pthread_cond_broadcast(&cond_); } bool inplace() const { return inplace_; } private: pthread_cond_t cond_; const wsrep::thread_service::cond_key* key_; const bool inplace_; bool waiter_; }; } int db::ti::before_init() { wsrep::log_info() << "db::ti::before_init()"; return 0; } int db::ti::after_init() { wsrep::log_info() << "db::ti::after_init()"; return 0; } ////////////////////////////////////////////////////////////////////////////// // Thread // ////////////////////////////////////////////////////////////////////////////// extern "C" { static void* start_thread(void* args_ptr) { thread_args* ta(reinterpret_cast(args_ptr)); ti_thread* thread = reinterpret_cast(ta->this_thread); this_ti_thread = thread; void* (*fn)(void*) = ta->fn; void* args = ta->args; delete ta; void* ret = (*fn)(args); this_ti_thread = nullptr; // If we end here the thread returned instead of calling // pthread_exit() if (thread->detached()) delete thread; return ret; } WSREP_NORETURN static void exit_thread(wsrep::thread_service::thread* thread, void* retval) { this_ti_thread = nullptr; ti_thread* th(reinterpret_cast(thread)); th->retval(retval); if (th->detached()) delete th; pthread_exit(retval); } } // extern "C" db::ti::ti() { thread_service::exit = exit_thread; } const wsrep::thread_service::thread_key* db::ti::create_thread_key(const char* name) WSREP_NOEXCEPT { assert(name); return reinterpret_cast( append_key(name, "thread")); } int db::ti::create_thread(const wsrep::thread_service::thread_key* key, wsrep::thread_service::thread** thread, void* (*fn)(void*), void* args) WSREP_NOEXCEPT { auto pit(new ti_thread(key)); total_allocations++; int ret; if ((ret = pit->run(fn, args))) { delete pit; } else { *thread = reinterpret_cast(pit); } return ret; } int db::ti::detach(wsrep::thread_service::thread* thread) WSREP_NOEXCEPT { return reinterpret_cast(thread)->detach(); } int db::ti::equal(wsrep::thread_service::thread* thread_1, wsrep::thread_service::thread* thread_2) WSREP_NOEXCEPT { return (reinterpret_cast(thread_1)->equal( reinterpret_cast(thread_2))); } int db::ti::join(wsrep::thread_service::thread* thread, void** retval) WSREP_NOEXCEPT { ti_thread* th(reinterpret_cast(thread)); int ret(th->join(retval)); if (not th->detached()) { delete th; } return ret; } wsrep::thread_service::thread* db::ti::self() WSREP_NOEXCEPT { return reinterpret_cast(ti_thread::self()); } int db::ti::setschedparam(wsrep::thread_service::thread* thread, int policy, const struct sched_param* param) WSREP_NOEXCEPT { return reinterpret_cast(thread)->setschedparam(policy, param); } int db::ti::getschedparam(wsrep::thread_service::thread* thread, int* policy, struct sched_param* param) WSREP_NOEXCEPT { return reinterpret_cast(thread)->getschedparam(policy, param); } ////////////////////////////////////////////////////////////////////////////// // Mutex // ////////////////////////////////////////////////////////////////////////////// const wsrep::thread_service::mutex_key* db::ti::create_mutex_key(const char* name) WSREP_NOEXCEPT { assert(name); return reinterpret_cast( append_key(name, "mutex")); } wsrep::thread_service::mutex* db::ti::init_mutex(const wsrep::thread_service::mutex_key* key, void* memblock, size_t memblock_size) WSREP_NOEXCEPT { return reinterpret_cast( memblock_size >= sizeof(ti_mutex) ? new (memblock) ti_mutex(key, true) : new ti_mutex(key, false)); } int db::ti::destroy(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT { ti_mutex* m(reinterpret_cast(mutex)); if (m->inplace()) { m->~ti_mutex(); } else { delete m; } return 0; } int db::ti::lock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT { return reinterpret_cast(mutex)->lock(); } int db::ti::trylock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT { return reinterpret_cast(mutex)->trylock(); } int db::ti::unlock(wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT { return reinterpret_cast(mutex)->unlock(); } ////////////////////////////////////////////////////////////////////////////// // Cond // ////////////////////////////////////////////////////////////////////////////// const wsrep::thread_service::cond_key* db::ti::create_cond_key(const char* name) WSREP_NOEXCEPT { assert(name); return reinterpret_cast( append_key(name, "cond")); } wsrep::thread_service::cond* db::ti::init_cond(const wsrep::thread_service::cond_key* key, void* memblock, size_t memblock_size) WSREP_NOEXCEPT { return reinterpret_cast( memblock_size >= sizeof(ti_cond) ? new (memblock) ti_cond(key, true) : new ti_cond(key, false)); } int db::ti::destroy(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT { ti_cond* c(reinterpret_cast(cond)); if (c->inplace()) { c->~ti_cond(); } else { delete c; } return 0; } int db::ti::wait(wsrep::thread_service::cond* cond, wsrep::thread_service::mutex* mutex) WSREP_NOEXCEPT { return reinterpret_cast(cond)->wait( *reinterpret_cast(mutex)); } int db::ti::timedwait(wsrep::thread_service::cond* cond, wsrep::thread_service::mutex* mutex, const struct timespec* ts) WSREP_NOEXCEPT { return reinterpret_cast(cond)->timedwait( *reinterpret_cast(mutex), ts); } int db::ti::signal(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT { return reinterpret_cast(cond)->signal(); } int db::ti::broadcast(wsrep::thread_service::cond* cond) WSREP_NOEXCEPT { return reinterpret_cast(cond)->broadcast(); } void db::ti::level(int level) { ::op_level = level; } void db::ti::cond_checks(bool cond_checks) { if (cond_checks) wsrep::log_info() << "Enabling condition variable checking"; ::cond_checks = cond_checks; } std::string db::ti::stats() { std::ostringstream os; os << "Totals:\n"; for (size_t i(0); i < total_ops.size(); ++i) { if (total_ops[i] > 0) { os << " " << ti_opstring(static_cast(i)) << ": " << total_ops[i] << "\n"; } } os << "Total allocations: " << total_allocations << "\n"; os << "Mutex contention: " << mutex_contention << "\n"; for (auto i : mutex_contention_counts) { os << " " << i.first << ": " << i.second << "\n"; } os << "Per key:\n"; std::map> sorted; for (size_t i(0); i < ops_map.size(); ++i) { sorted.insert(std::make_pair(get_key_name_by_index(i), ops_map[i])); } for (auto i : sorted) { for (size_t j(0); j < i.second.size(); ++j) { if (i.second[j]) { os << " " << i.first << ": " << ti_opstring(static_cast(j)) << ": " << i.second[j] << "\n"; } } } return os.str(); }