diff options
Diffstat (limited to 'storage/tokudb/tokudb_thread.h')
-rw-r--r-- | storage/tokudb/tokudb_thread.h | 526 |
1 files changed, 526 insertions, 0 deletions
diff --git a/storage/tokudb/tokudb_thread.h b/storage/tokudb/tokudb_thread.h new file mode 100644 index 00000000..53be6a75 --- /dev/null +++ b/storage/tokudb/tokudb_thread.h @@ -0,0 +1,526 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +/* -*- mode: C; c-basic-offset: 4 -*- */ +#ident "$Id$" +/*====== +This file is part of TokuDB + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + TokuDBis is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + TokuDB is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with TokuDB. If not, see <http://www.gnu.org/licenses/>. + +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#ifndef _TOKUDB_SYNC_H +#define _TOKUDB_SYNC_H + +#include "hatoku_defines.h" +#include "tokudb_debug.h" +#include "tokudb_time.h" + +namespace tokudb { +namespace thread { + +extern const pfs_key_t pfs_not_instrumented; + +uint my_tid(void); + +// Your basic mutex +class mutex_t { +public: + explicit mutex_t(pfs_key_t key); + mutex_t(void) : mutex_t(pfs_not_instrumented) {} + ~mutex_t(void); + + void reinit(pfs_key_t key); + void lock( +#if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE) + const char* src_file, + uint src_line +#endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE + ); + void unlock( +#if defined(SAFE_MUTEX) + const char* src_file, + uint src_line +#endif // SAFE_MUTEX + ); +#ifdef TOKUDB_DEBUG + bool is_owned_by_me(void) const; +#endif +private: + static pthread_t _null_owner; + mysql_mutex_t _mutex; +#ifdef TOKUDB_DEBUG + uint _owners; + pthread_t _owner; +#endif +}; + +// Simple read write lock +class rwlock_t { +public: + explicit rwlock_t(pfs_key_t key); + rwlock_t(void) : rwlock_t(pfs_not_instrumented) {} + ~rwlock_t(void); + + void lock_read( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + const char* src_file, + uint src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + ); + void lock_write( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + const char* src_file, + uint src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + ); + void unlock(void); + +private: + rwlock_t(const rwlock_t&); + rwlock_t& operator=(const rwlock_t&); + + mysql_rwlock_t _rwlock; +}; + +// Simple event signal/wait class +class event_t { +public: + // create_signalled - create the event in a signalled state + // manual_reset - create an event that must be manually reset + // after signaling + event_t( + bool create_signalled = false, + bool manual_reset = false); + ~event_t(void); + + // wait for the event to become signalled + void wait(void); + + // signal the event + void signal(void); + + // pulse the event (signal and free exactly one waiter) + void pulse(void); + + // is the event currently signalled + bool signalled(void); + + // unsignal/clear the event + void reset(void); + +private: + event_t(const event_t&); + event_t& operator=(const event_t&); + + pthread_mutex_t _mutex; + pthread_cond_t _cond; + bool _signalled; + bool _pulsed; + bool _manual_reset; +}; + +// Semaphore signal/wait class +class semaphore_t { +public: + // initial_count - the initial signal count of the semaphore + // max_count - the maximum signal count for the semaphore. + semaphore_t(int initial_count, int max_count); + ~semaphore_t(void); + + enum E_WAIT { + E_SIGNALLED = 0, + E_INTERRUPTED = 1, + E_TIMEDOUT = 2 + }; + + // wait for the semaphore to become signalled + E_WAIT wait(void); + + // signal the semaphore to increase the count + // return true if signalled, false if ignored due to count + bool signal(void); + + // what is the semaphore signal count + int signalled(void); + + // unsignal a signalled semaphore + void reset(void); + + // set to interrupt any waiters, as long as is set, + // waiters will return immediately with E_INTERRUPTED. + // the semaphore signal count and tracking will continue + // accepting signals and leave the signalled state intact + void set_interrupt(void); + void clear_interrupt(void); + +private: + semaphore_t(const semaphore_t&); + semaphore_t& operator=(const semaphore_t&); + + pthread_mutex_t _mutex; + pthread_cond_t _cond; + bool _interrupted; + int _signalled; + int _initial_count; + int _max_count; +}; + +// Thread class +class thread_t { +public: + thread_t(void); + ~thread_t(void); + + int start(void* (*pfn)(void*), void* arg); + int join(void** value_ptr); + int detach(void); + +private: + pthread_t _thread; +}; + +inline uint my_tid(void) { return (uint)toku_os_gettid(); } + +inline mutex_t::mutex_t(pfs_key_t key) { +#ifdef TOKUDB_DEBUG + _owners = 0; + _owner = _null_owner; +#endif + int r MY_ATTRIBUTE((unused)) = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST); + assert_debug(r == 0); +} +inline mutex_t::~mutex_t() { +#ifdef TOKUDB_DEBUG + assert_debug(_owners == 0); +#endif + int r MY_ATTRIBUTE((unused)) = mysql_mutex_destroy(&_mutex); + assert_debug(r == 0); +} +inline void mutex_t::reinit(pfs_key_t key) { +#ifdef TOKUDB_DEBUG + assert_debug(_owners == 0); +#endif + int r MY_ATTRIBUTE((unused)); + r = mysql_mutex_destroy(&_mutex); + assert_debug(r == 0); + r = mysql_mutex_init(key, &_mutex, MY_MUTEX_INIT_FAST); + assert_debug(r == 0); +} +inline void mutex_t::lock( +#if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE) + const char* src_file, + uint src_line +#endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE + ) { + assert_debug(is_owned_by_me() == false); + int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_lock(&_mutex +#if defined(SAFE_MUTEX) || defined(HAVE_PSI_MUTEX_INTERFACE) + , + src_file, + src_line +#endif // SAFE_MUTEX || HAVE_PSI_MUTEX_INTERFACE + ); + assert_debug(r == 0); +#ifdef TOKUDB_DEBUG + _owners++; + _owner = pthread_self(); +#endif +} +inline void mutex_t::unlock( +#if defined(SAFE_MUTEX) + const char* src_file, + uint src_line +#endif // SAFE_MUTEX + ) { +#ifdef TOKUDB_DEBUG + assert_debug(_owners > 0); + assert_debug(is_owned_by_me()); + _owners--; + _owner = _null_owner; +#endif + int r MY_ATTRIBUTE((unused)) = inline_mysql_mutex_unlock(&_mutex +#if defined(SAFE_MUTEX) + , + src_file, + src_line +#endif // SAFE_MUTEX + ); + assert_debug(r == 0); +} +#ifdef TOKUDB_DEBUG +inline bool mutex_t::is_owned_by_me(void) const { + return pthread_equal(pthread_self(), _owner) != 0 ? true : false; +} +#endif + +inline rwlock_t::rwlock_t(pfs_key_t key) { + int r MY_ATTRIBUTE((unused)) = mysql_rwlock_init(key, &_rwlock); + assert_debug(r == 0); +} +inline rwlock_t::~rwlock_t() { + int r MY_ATTRIBUTE((unused)) = mysql_rwlock_destroy(&_rwlock); + assert_debug(r == 0); +} +inline void rwlock_t::lock_read( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + const char* src_file, + uint src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + ) { + int r; + while ((r = inline_mysql_rwlock_rdlock(&_rwlock +#ifdef HAVE_PSI_RWLOCK_INTERFACE + , + src_file, + src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + )) != 0) { + if (r == EBUSY || r == EAGAIN) { + time::sleep_microsec(1000); + continue; + } + break; + } + assert_debug(r == 0); +} +inline void rwlock_t::lock_write( +#ifdef HAVE_PSI_RWLOCK_INTERFACE + const char* src_file, + uint src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + ) { + int r; + while ((r = inline_mysql_rwlock_wrlock(&_rwlock +#ifdef HAVE_PSI_RWLOCK_INTERFACE + , + src_file, + src_line +#endif // HAVE_PSI_RWLOCK_INTERFACE + )) != 0) { + if (r == EBUSY || r == EAGAIN) { + time::sleep_microsec(1000); + continue; + } + break; + } + assert_debug(r == 0); +} +inline void rwlock_t::unlock(void) { + int r MY_ATTRIBUTE((unused)) = mysql_rwlock_unlock(&_rwlock); + assert_debug(r == 0); +} +inline rwlock_t::rwlock_t(const rwlock_t&) {} +inline rwlock_t& rwlock_t::operator=(const rwlock_t&) { + return *this; +} + + +inline event_t::event_t(bool create_signalled, bool manual_reset) : + _manual_reset(manual_reset) { + + int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL); + assert_debug(r == 0); + r = pthread_cond_init(&_cond, NULL); + assert_debug(r == 0); + if (create_signalled) { + _signalled = true; + } else { + _signalled = false; + } + _pulsed = false; +} +inline event_t::~event_t(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex); + assert_debug(r == 0); + r = pthread_cond_destroy(&_cond); + assert_debug(r == 0); +} +inline void event_t::wait(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + while (_signalled == false && _pulsed == false) { + r = pthread_cond_wait(&_cond, &_mutex); + assert_debug(r == 0); + } + if (_manual_reset == false) + _signalled = false; + if (_pulsed) + _pulsed = false; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return; +} +inline void event_t::signal(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _signalled = true; + if (_manual_reset) { + r = pthread_cond_broadcast(&_cond); + assert_debug(r == 0); + } else { + r = pthread_cond_signal(&_cond); + assert_debug(r == 0); + } + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); +} +inline void event_t::pulse(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _pulsed = true; + r = pthread_cond_signal(&_cond); + assert_debug(r == 0); + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); +} +inline bool event_t::signalled(void) { + bool ret = false; + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + ret = _signalled; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return ret; +} +inline void event_t::reset(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _signalled = false; + _pulsed = false; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return; +} +inline event_t::event_t(const event_t&) { +} +inline event_t& event_t::operator=(const event_t&) { + return *this; +} + + +inline semaphore_t::semaphore_t( + int initial_count, + int max_count) : + _interrupted(false), + _initial_count(initial_count), + _max_count(max_count) { + + int r MY_ATTRIBUTE((unused)) = pthread_mutex_init(&_mutex, NULL); + assert_debug(r == 0); + r = pthread_cond_init(&_cond, NULL); + assert_debug(r == 0); + _signalled = _initial_count; +} +inline semaphore_t::~semaphore_t(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_destroy(&_mutex); + assert_debug(r == 0); + r = pthread_cond_destroy(&_cond); + assert_debug(r == 0); +} +inline semaphore_t::E_WAIT semaphore_t::wait(void) { + E_WAIT ret; + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + while (_signalled == 0 && _interrupted == false) { + r = pthread_cond_wait(&_cond, &_mutex); + assert_debug(r == 0); + } + if (_interrupted) { + ret = E_INTERRUPTED; + } else { + _signalled--; + ret = E_SIGNALLED; + } + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return ret; +} +inline bool semaphore_t::signal(void) { + bool ret = false; + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + if (_signalled < _max_count) { + _signalled++; + ret = true; + } + r = pthread_cond_signal(&_cond); + assert_debug(r == 0); + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return ret; +} +inline int semaphore_t::signalled(void) { + int ret = 0; + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + ret = _signalled; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return ret; +} +inline void semaphore_t::reset(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _signalled = 0; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); + return; +} +inline void semaphore_t::set_interrupt(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _interrupted = true; + r = pthread_cond_broadcast(&_cond); + assert_debug(r == 0); + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); +} +inline void semaphore_t::clear_interrupt(void) { + int r MY_ATTRIBUTE((unused)) = pthread_mutex_lock(&_mutex); + assert_debug(r == 0); + _interrupted = false; + r = pthread_mutex_unlock(&_mutex); + assert_debug(r == 0); +} +inline semaphore_t::semaphore_t(const semaphore_t&) { +} +inline semaphore_t& semaphore_t::operator=(const semaphore_t&) { + return *this; +} + + +inline thread_t::thread_t(void) : _thread(0) { +} +inline thread_t::~thread_t(void) { +} +inline int thread_t::start(void*(*pfn)(void*), void* arg) { + return pthread_create(&_thread, NULL, pfn, arg); +} +inline int thread_t::join(void** value_ptr) { + return pthread_join(_thread, value_ptr); +} +inline int thread_t::detach(void) { + return pthread_detach(_thread); +} + +} // namespace thread +} // namespace tokudb + + +#endif // _TOKUDB_SYNC_H |