diff options
Diffstat (limited to 'ml/dlib/dlib/threads')
40 files changed, 8575 insertions, 0 deletions
diff --git a/ml/dlib/dlib/threads/async.cpp b/ml/dlib/dlib/threads/async.cpp new file mode 100644 index 000000000..6aa947bcb --- /dev/null +++ b/ml/dlib/dlib/threads/async.cpp @@ -0,0 +1,48 @@ +// Copyright (C) 2016 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_AsYNC_CPP_ +#define DLIB_AsYNC_CPP_ + +// C++11 things don't work in old versions of visual studio +#if !defined( _MSC_VER) || _MSC_VER >= 1900 + +#include "async.h" +#include <stdlib.h> +#include "../string.h" +#include <thread> + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + namespace impl + { + unsigned long default_num_threads() + { + try + { + char* nt = getenv("DLIB_NUM_THREADS"); + if (nt) + return string_cast<unsigned long>(nt); + } catch(string_cast_error&) {} + return std::thread::hardware_concurrency(); + } + } + +// ---------------------------------------------------------------------------------------- + + thread_pool& default_thread_pool() + { + static thread_pool tp(impl::default_num_threads()); + return tp; + } +} + +// ---------------------------------------------------------------------------------------- + +#endif + +#endif // DLIB_AsYNC_CPP_ + + diff --git a/ml/dlib/dlib/threads/async.h b/ml/dlib/dlib/threads/async.h new file mode 100644 index 000000000..bc6fe5575 --- /dev/null +++ b/ml/dlib/dlib/threads/async.h @@ -0,0 +1,105 @@ +// Copyright (C) 2016 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_AsYNC_Hh_ +#define DLIB_AsYNC_Hh_ + +// C++11 things don't work in old versions of visual studio +#if !defined( _MSC_VER) || _MSC_VER >= 1900 + +#include "async_abstract.h" +#include "thread_pool_extension.h" +#include <future> +#include <functional> + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + namespace impl + { + template <typename T> struct selector {}; + + template <typename T, typename U, typename V> + void call_prom_set_value( + T& prom, + U& fun, + selector<V> + ) + { + prom.set_value(fun()); + } + + template <typename T, typename U> + void call_prom_set_value( + T& prom, + U& fun, + selector<void> + ) + { + fun(); + prom.set_value(); + } + } + +// ---------------------------------------------------------------------------------------- + + thread_pool& default_thread_pool(); + +// ---------------------------------------------------------------------------------------- + + template < + typename Function, + typename ...Args + > + std::future<typename std::result_of<Function(Args...)>::type> async( + thread_pool& tp, + Function&& f, + Args&&... args + ) + { + auto prom = std::make_shared<std::promise<typename std::result_of<Function(Args...)>::type>>(); + std::future<typename std::result_of<Function(Args...)>::type> ret = prom->get_future(); + using bind_t = decltype(std::bind(std::forward<Function>(f), std::forward<Args>(args)...)); + auto fun = std::make_shared<bind_t>(std::bind(std::forward<Function>(f), std::forward<Args>(args)...)); + tp.add_task_by_value([fun, prom]() + { + try + { + impl::call_prom_set_value(*prom, *fun, impl::selector<typename std::result_of<Function(Args...)>::type>()); + } + catch(...) + { + prom->set_exception(std::current_exception()); + } + }); + return std::move(ret); + } + +// ---------------------------------------------------------------------------------------- + + template < + typename Function, + typename ...Args + > + std::future<typename std::result_of<Function(Args...)>::type> async( + Function&& f, + Args&&... args + ) + { + return async(default_thread_pool(), std::forward<Function>(f), std::forward<Args>(args)...); + } + +} + +// ---------------------------------------------------------------------------------------- + +#ifdef NO_MAKEFILE +#include "async.cpp" +#endif + +#endif +#endif // DLIB_AsYNC_Hh_ + + + diff --git a/ml/dlib/dlib/threads/async_abstract.h b/ml/dlib/dlib/threads/async_abstract.h new file mode 100644 index 000000000..a9fa1e458 --- /dev/null +++ b/ml/dlib/dlib/threads/async_abstract.h @@ -0,0 +1,67 @@ +// Copyright (C) 2016 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_AsYNC_ABSTRACT_Hh_ +#ifdef DLIB_AsYNC_ABSTRACT_Hh_ + +#include "thread_pool_extension_abstract.h" +#include <future> +#include <functional> + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + thread_pool& default_thread_pool( + ); + /*! + ensures + - returns a reference to a global thread_pool. If the DLIB_NUM_THREADS + environment variable is set to an integer then the thread pool will contain + DLIB_NUM_THREADS threads, otherwise it will contain + std::thread::hardware_concurrency() threads. + !*/ + +// ---------------------------------------------------------------------------------------- + + template < + typename Function, + typename ...Args + > + std::future<typename std::result_of<Function(Args...)>::type> async( + thread_pool& tp, + Function&& f, + Args&&... args + ); + /*! + requires + - f must be a function and f(args...) must be a valid expression. + ensures + - This function behaves just like std::async(std::launch::async, f, args) + except that instead of spawning a new thread to process each task it submits + the task to the provided dlib::thread_pool. Therefore, dlib::async() is + guaranteed to use a bounded number of threads unlike std::async(). This also + means that calls to dlib::async() will block if there aren't any free threads + in the thread pool. + !*/ + +// ---------------------------------------------------------------------------------------- + + template < + typename Function, + typename ...Args + > + std::future<typename std::result_of<Function(Args...)>::type> async( + Function&& f, + Args&&... args + ); + /*! + ensures + - Calling this function is equivalent to directly calling async(default_thread_pool(), f, args...) + !*/ +} + +// ---------------------------------------------------------------------------------------- + +#endif // DLIB_AsYNC_ABSTRACT_Hh_ + diff --git a/ml/dlib/dlib/threads/auto_mutex_extension.h b/ml/dlib/dlib/threads/auto_mutex_extension.h new file mode 100644 index 000000000..595c1b176 --- /dev/null +++ b/ml/dlib/dlib/threads/auto_mutex_extension.h @@ -0,0 +1,180 @@ +// Copyright (C) 2005 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_AUTO_MUTEX_EXTENSIOn_ +#define DLIB_AUTO_MUTEX_EXTENSIOn_ + +#include "threads_kernel.h" +#include "rmutex_extension.h" +#include "read_write_mutex_extension.h" +#include "auto_mutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class auto_mutex + { + /*! + INITIAL VALUE + - if (m != 0) then + - the mutex pointed to by m is locked + - if (r != 0) then + - the mutex pointed to by r is locked + - if (rw != 0) then + - the mutex pointed to by rw is locked + - exactly one of r, m, or rw is not 0. + + CONVENTION + - if (m != 0) then + - the mutex pointed to by m is locked + - if (r != 0) then + - the mutex pointed to by r is locked + - if (rw != 0) then + - the mutex pointed to by rw is locked + - exactly one of r, m, or rw is not 0. + !*/ + public: + + explicit auto_mutex ( + const mutex& m_ + ) : m(&m_), + r(0), + rw(0) + { + m->lock(); + } + + explicit auto_mutex ( + const rmutex& r_ + ) : m(0), + r(&r_), + rw(0) + { + r->lock(); + } + + explicit auto_mutex ( + const read_write_mutex& rw_ + ) : m(0), + r(0), + rw(&rw_) + { + rw->lock(); + } + + void unlock() + { + if (m != 0) + { + m->unlock(); + m = 0; + } + else if (r != 0) + { + r->unlock(); + r = 0; + } + else if (rw != 0) + { + rw->unlock(); + rw = 0; + } + } + + ~auto_mutex ( + ) + { + unlock(); + } + + private: + + const mutex* m; + const rmutex* r; + const read_write_mutex* rw; + + // restricted functions + auto_mutex(auto_mutex&); // copy constructor + auto_mutex& operator=(auto_mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + + class auto_mutex_readonly + { + public: + + explicit auto_mutex_readonly ( + const read_write_mutex& rw_ + ) : rw(rw_), _has_write_lock(false), _has_read_lock(true) + { + rw.lock_readonly(); + } + + ~auto_mutex_readonly ( + ) + { + unlock(); + } + + void lock_readonly ( + ) + { + if (!_has_read_lock) + { + unlock(); + rw.lock_readonly(); + _has_read_lock = true; + } + } + + void lock_write ( + ) + { + if (!_has_write_lock) + { + unlock(); + rw.lock(); + _has_write_lock = true; + } + } + + void unlock ( + ) + { + if (_has_write_lock) + { + rw.unlock(); + _has_write_lock = false; + } + else if (_has_read_lock) + { + rw.unlock_readonly(); + _has_read_lock = false; + } + } + + bool has_read_lock ( + ) { return _has_read_lock; } + + bool has_write_lock ( + ) { return _has_write_lock; } + + private: + + const read_write_mutex& rw; + bool _has_write_lock; + bool _has_read_lock; + + // restricted functions + auto_mutex_readonly(auto_mutex_readonly&); // copy constructor + auto_mutex_readonly& operator=(auto_mutex_readonly&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_AUTO_MUTEX_EXTENSIOn_ + diff --git a/ml/dlib/dlib/threads/auto_mutex_extension_abstract.h b/ml/dlib/dlib/threads/auto_mutex_extension_abstract.h new file mode 100644 index 000000000..1990c834e --- /dev/null +++ b/ml/dlib/dlib/threads/auto_mutex_extension_abstract.h @@ -0,0 +1,185 @@ +// Copyright (C) 2005 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_AUTO_MUTEX_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_AUTO_MUTEX_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" +#include "rmutex_extension_abstract.h" +#include "read_write_mutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class auto_mutex + { + /*! + INITIAL VALUE + The mutex given in the constructor is locked and associated with this + object. + + WHAT THIS OBJECT REPRESENTS + This object represents a mechanism for automatically locking and unlocking + a mutex object. + !*/ + public: + + explicit auto_mutex ( + const mutex& m + ); + /*! + ensures + - #*this is properly initialized + - m will be locked + !*/ + + explicit auto_mutex ( + const rmutex& m + ); + /*! + ensures + - #*this is properly initialized + - m will be locked + !*/ + + explicit auto_mutex ( + const read_write_mutex& m + ); + /*! + ensures + - #*this is properly initialized + - m will be locked via m.lock() (i.e. a write lock will be obtained) + !*/ + + void unlock( + ); + /*! + ensures + - if (unlock() has not already been called) then + - The mutex associated with *this has been unlocked. This is useful if + you want to unlock a mutex before the auto_mutex destructor executes. + !*/ + + ~auto_mutex ( + ); + /*! + ensures + - all resources allocated by *this have been freed + - calls unlock() + !*/ + + private: + // restricted functions + auto_mutex(auto_mutex&); // copy constructor + auto_mutex& operator=(auto_mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + + class auto_mutex_readonly + { + /*! + INITIAL VALUE + The mutex given in the constructor is locked using a read-only lock and + associated with this object. + + WHAT THIS OBJECT REPRESENTS + This object represents a mechanism for automatically locking and unlocking + a read_write_mutex object. In particular, a readonly lock is used. + !*/ + public: + + explicit auto_mutex_readonly ( + const read_write_mutex& m + ); + /*! + ensures + - #*this is properly initialized + - a readonly lock will be obtained on m using m.lock_readonly() + - #has_read_lock() == true + !*/ + + ~auto_mutex_readonly ( + ); + /*! + ensures + - all resources allocated by *this have been freed + - the mutex associated with *this has been unlocked + !*/ + + bool has_read_lock ( + ); + /*! + ensures + - returns true if this object has called read_write_mutex::lock_readonly() + on its associated mutex and has yet to release that lock. + !*/ + + bool has_write_lock ( + ); + /*! + ensures + - returns true if this object has called read_write_mutex::lock() on its + associated mutex and has yet to release that lock. + !*/ + + void lock_readonly ( + ); + /*! + ensures + - This function converts the lock on the associated mutex into a readonly lock. + Specifically: + if (!has_read_lock()) then + - if (has_write_lock()) then + - unlocks the associated mutex and then relocks it by calling + read_write_mutex::lock_readonly() + - else + - locks the associated mutex by calling read_write_mutex::lock_readonly() + - #has_read_lock() == true + - Note that the lock switch is not atomic. This means that whatever + resource is protected by the mutex might have been modified during the + call to lock_readonly(). + !*/ + + void lock_write ( + ); + /*! + ensures + - This function converts the lock on the associated mutex into a write lock. + Specifically: + if (!has_write_lock()) then + - if (has_read_lock()) then + - unlocks the associated mutex and then relocks it by calling + read_write_mutex::lock() + - else + - locks the associated mutex by calling read_write_mutex::lock() + - #has_write_lock() == true + - Note that the lock switch is not atomic. This means that whatever + resource is protected by the mutex might have been modified during the + call to lock_write(). + !*/ + + void unlock ( + ); + /*! + ensures + - if (has_read_lock() || has_write_lock()) then + - unlocks the associated mutex. This is useful if you want to unlock a + mutex before the auto_mutex_readonly destructor executes. + - #has_read_lock() == false + - #has_write_lock() == false + !*/ + + private: + // restricted functions + auto_mutex_readonly(auto_mutex_readonly&); // copy constructor + auto_mutex_readonly& operator=(auto_mutex_readonly&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_AUTO_MUTEX_EXTENSIOn_ABSTRACT_ + diff --git a/ml/dlib/dlib/threads/auto_unlock_extension.h b/ml/dlib/dlib/threads/auto_unlock_extension.h new file mode 100644 index 000000000..cd1d4db9a --- /dev/null +++ b/ml/dlib/dlib/threads/auto_unlock_extension.h @@ -0,0 +1,116 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_AUTO_UNLOCK_EXTENSIOn_ +#define DLIB_AUTO_UNLOCK_EXTENSIOn_ + +#include "threads_kernel.h" +#include "rmutex_extension.h" +#include "read_write_mutex_extension.h" +#include "auto_unlock_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class auto_unlock + { + /*! + INITIAL VALUE + - if (m != 0) then + - the mutex pointed to by m is locked + - if (r != 0) then + - the mutex pointed to by r is locked + - if (rw != 0) then + - the mutex pointed to by rw is locked + - exactly one of r, m, or rw is not 0. + + CONVENTION + - if (m != 0) then + - the mutex pointed to by m is locked + - if (r != 0) then + - the mutex pointed to by r is locked + - if (rw != 0) then + - the mutex pointed to by rw is locked + - exactly one of r, m, or rw is not 0. + !*/ + public: + + explicit auto_unlock ( + const mutex& m_ + ) : m(&m_), + r(0), + rw(0) + {} + + explicit auto_unlock ( + const rmutex& r_ + ) : m(0), + r(&r_), + rw(0) + {} + + explicit auto_unlock ( + const read_write_mutex& rw_ + ) : m(0), + r(0), + rw(&rw_) + {} + + ~auto_unlock ( + ) + { + if (m != 0) + m->unlock(); + else if (r != 0) + r->unlock(); + else + rw->unlock(); + } + + private: + + const mutex* m; + const rmutex* r; + const read_write_mutex* rw; + + // restricted functions + auto_unlock(auto_unlock&); // copy constructor + auto_unlock& operator=(auto_unlock&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + + class auto_unlock_readonly + { + + public: + + explicit auto_unlock_readonly ( + const read_write_mutex& rw_ + ) : + rw(rw_) + {} + + ~auto_unlock_readonly ( + ) + { + rw.unlock_readonly(); + } + + private: + + const read_write_mutex& rw; + + // restricted functions + auto_unlock_readonly(auto_unlock_readonly&); // copy constructor + auto_unlock_readonly& operator=(auto_unlock_readonly&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_AUTO_UNLOCK_EXTENSIOn_ + + diff --git a/ml/dlib/dlib/threads/auto_unlock_extension_abstract.h b/ml/dlib/dlib/threads/auto_unlock_extension_abstract.h new file mode 100644 index 000000000..f947d4879 --- /dev/null +++ b/ml/dlib/dlib/threads/auto_unlock_extension_abstract.h @@ -0,0 +1,116 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_AUTO_UNLOCK_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_AUTO_UNLOCK_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" +#include "rmutex_extension_abstract.h" +#include "read_write_mutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class auto_unlock + { + /*! + INITIAL VALUE + The mutex given in the constructor is associated with this object. + + WHAT THIS OBJECT REPRESENTS + This object represents a mechanism for automatically unlocking + a mutex object. It is useful when you already have a locked mutex + and want to make sure it gets unlocked even if an exception is thrown + or you quit the function at a weird spot. + !*/ + public: + + explicit auto_unlock ( + const mutex& m + ); + /*! + ensures + - #*this is properly initialized + - does not modify m in any way + !*/ + + explicit auto_unlock ( + const rmutex& m + ); + /*! + ensures + - #*this is properly initialized + - does not modify m in any way + !*/ + + explicit auto_unlock ( + const read_write_mutex& m + ); + /*! + ensures + - #*this is properly initialized + - does not modify m in any way + !*/ + + ~auto_unlock ( + ); + /*! + ensures + - all resources allocated by *this have been freed + - calls unlock() on the mutex associated with *this + !*/ + + private: + // restricted functions + auto_unlock(auto_unlock&); // copy constructor + auto_unlock& operator=(auto_unlock&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + + class auto_unlock_readonly + { + /*! + INITIAL VALUE + The mutex given in the constructor is associated with this object. + + WHAT THIS OBJECT REPRESENTS + This object represents a mechanism for automatically unlocking + a read_write_mutex object. It is useful when you already have a locked mutex + and want to make sure it gets unlocked even if an exception is thrown + or you quit the function at a weird spot. Note that the mutex + is unlocked by calling unlock_readonly() on it. + !*/ + public: + + explicit auto_unlock_readonly ( + const read_write_mutex& m + ); + /*! + ensures + - #*this is properly initialized + - does not modify m in any way + !*/ + + ~auto_unlock_readonly ( + ); + /*! + ensures + - all resources allocated by *this have been freed + - calls unlock_readonly() on the mutex associated with *this + !*/ + + private: + // restricted functions + auto_unlock_readonly(auto_unlock_readonly&); // copy constructor + auto_unlock_readonly& operator=(auto_unlock_readonly&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_AUTO_UNLOCK_EXTENSIOn_ABSTRACT_ + + diff --git a/ml/dlib/dlib/threads/create_new_thread_extension.h b/ml/dlib/dlib/threads/create_new_thread_extension.h new file mode 100644 index 000000000..8f419b6be --- /dev/null +++ b/ml/dlib/dlib/threads/create_new_thread_extension.h @@ -0,0 +1,46 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_CREATE_NEW_THREAD_EXTENSIOn_ +#define DLIB_CREATE_NEW_THREAD_EXTENSIOn_ + +#include "threads_kernel_abstract.h" +#include "create_new_thread_extension_abstract.h" +#include "../threads.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template < + typename T, + void (T::*funct)() + > + inline void dlib_create_new_thread_helper ( + void* obj + ) + { + T* o = static_cast<T*>(obj); + (o->*funct)(); + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T, + void (T::*funct)() + > + inline bool create_new_thread ( + T& obj + ) + { + return create_new_thread(dlib_create_new_thread_helper<T,funct>,&obj); + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_CREATE_NEW_THREAD_EXTENSIOn_ + + diff --git a/ml/dlib/dlib/threads/create_new_thread_extension_abstract.h b/ml/dlib/dlib/threads/create_new_thread_extension_abstract.h new file mode 100644 index 000000000..43fbc474d --- /dev/null +++ b/ml/dlib/dlib/threads/create_new_thread_extension_abstract.h @@ -0,0 +1,33 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_CREATE_NEW_THREAD_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_CREATE_NEW_THREAD_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template < + typename T, + void (T::*funct)() + > + bool create_new_thread ( + T& obj + ); + /*! + ensures + - creates a new thread and calls obj.*funct() from it. + - returns true upon success and false upon failure to create the new thread. + !*/ + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_CREATE_NEW_THREAD_EXTENSIOn_ABSTRACT_ + + + diff --git a/ml/dlib/dlib/threads/multithreaded_object_extension.cpp b/ml/dlib/dlib/threads/multithreaded_object_extension.cpp new file mode 100644 index 000000000..def4af5f2 --- /dev/null +++ b/ml/dlib/dlib/threads/multithreaded_object_extension.cpp @@ -0,0 +1,241 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_MULTITHREADED_OBJECT_EXTENSIOn_CPP +#define DLIB_MULTITHREADED_OBJECT_EXTENSIOn_CPP + +#include "multithreaded_object_extension.h" +#include "create_new_thread_extension.h" + + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + multithreaded_object:: + multithreaded_object ( + ): + s(m_), + is_running_(false), + should_stop_(false), + threads_started(0) + { + } + +// ---------------------------------------------------------------------------------------- + + multithreaded_object:: + ~multithreaded_object ( + ) + { + try + { + DLIB_ASSERT(number_of_threads_alive() == 0, + "\tmultithreaded_object::~multithreaded_object()" + << "\n\tYou have let a multithreaded object destruct itself before terminating its threads" + << "\n\tthis: " << this + ); + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + assert(false); + abort(); + } + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + clear ( + ) + { + auto_mutex M(m_); + stop(); + wait(); + dead_threads.clear(); + is_running_ = false; + should_stop_ = false; + } + +// ---------------------------------------------------------------------------------------- + + bool multithreaded_object:: + is_running ( + ) const + { + auto_mutex M(m_); + return is_running_; + } + +// ---------------------------------------------------------------------------------------- + + unsigned long multithreaded_object:: + number_of_threads_registered ( + ) const + { + auto_mutex M(m_); + return thread_ids.size() + dead_threads.size(); + } + +// ---------------------------------------------------------------------------------------- + + unsigned long multithreaded_object:: + number_of_threads_alive ( + ) const + { + auto_mutex M(m_); + return threads_started; + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + wait ( + ) const + { + auto_mutex M(m_); + + DLIB_ASSERT(thread_ids.is_in_domain(get_thread_id()) == false, + "\tvoid multithreaded_object::wait()" + << "\n\tYou can NOT call this function from one of the threads registered in this object" + << "\n\tthis: " << this + ); + + while (threads_started > 0) + s.wait(); + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + start ( + ) + { + auto_mutex M(m_); + const unsigned long num_threads_registered = dead_threads.size() + thread_ids.size(); + // start any dead threads + for (unsigned long i = threads_started; i < num_threads_registered; ++i) + { + if (create_new_thread<multithreaded_object,&multithreaded_object::thread_helper>(*this) == false) + { + should_stop_ = true; + is_running_ = false; + throw thread_error(); + } + ++threads_started; + } + is_running_ = true; + should_stop_ = false; + s.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + pause ( + ) + { + auto_mutex M(m_); + is_running_ = false; + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + stop ( + ) + { + auto_mutex M(m_); + should_stop_ = true; + is_running_ = false; + s.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + bool multithreaded_object:: + should_stop ( + ) const + { + auto_mutex M(m_); + DLIB_ASSERT(thread_ids.is_in_domain(get_thread_id()), + "\tbool multithreaded_object::should_stop()" + << "\n\tYou can only call this function from one of the registered threads in this object" + << "\n\tthis: " << this + ); + while (is_running_ == false && should_stop_ == false) + s.wait(); + return should_stop_; + } + +// ---------------------------------------------------------------------------------------- + + multithreaded_object::raii_thread_helper:: + raii_thread_helper( + multithreaded_object& self_, + thread_id_type id_ + ) : self(self_), id(id_){} + + multithreaded_object::raii_thread_helper:: + ~raii_thread_helper() + { + auto_mutex M(self.m_); + if (self.thread_ids.is_in_domain(id)) + { + mfp temp; + thread_id_type id_temp; + self.thread_ids.remove(id,id_temp,temp); + // put this thread's registered function back into the dead_threads queue + self.dead_threads.enqueue(temp); + } + + --self.threads_started; + // If this is the last thread to terminate then + // signal that that is the case. + if (self.threads_started == 0) + { + self.is_running_ = false; + self.should_stop_ = false; + self.s.broadcast(); + } + } + +// ---------------------------------------------------------------------------------------- + + void multithreaded_object:: + thread_helper( + ) + { + mfp mf; + thread_id_type id = get_thread_id(); + + // this guy's destructor does all the necessary cleanup in this function + raii_thread_helper raii(*this, id); + + // if there is a dead_thread sitting around then pull it + // out and put it into mf + { + auto_mutex M(m_); + if (dead_threads.size() > 0) + { + dead_threads.dequeue(mf); + mfp temp(mf); + thread_ids.add(id,temp); + } + } + + if (mf.is_set()) + { + // call the registered thread function + mf(); + } + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_MULTITHREADED_OBJECT_EXTENSIOn_CPP + + diff --git a/ml/dlib/dlib/threads/multithreaded_object_extension.h b/ml/dlib/dlib/threads/multithreaded_object_extension.h new file mode 100644 index 000000000..9dd37fdcc --- /dev/null +++ b/ml/dlib/dlib/threads/multithreaded_object_extension.h @@ -0,0 +1,153 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ +#define DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ + +#include "multithreaded_object_extension_abstract.h" +#include "threads_kernel.h" +#include "auto_mutex_extension.h" +#include "rmutex_extension.h" +#include "rsignaler_extension.h" +#include "../algs.h" +#include "../assert.h" +#include "../map.h" +#include "../member_function_pointer.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class multithreaded_object + { + /*! + INITIAL VALUE + - is_running_ == false + - should_stop_ == false + - thread_ids.size() == 0 + - dead_threads.size() == 0 + - threads_started == 0 + + CONVENTION + - number_of_threads_registered() == thread_ids.size() + dead_threads.size() + - number_of_threads_alive() == threads_started + + - is_running() == is_running_ + - should_stop() == should_stop_ + + - thread_ids == a map of current thread ids to the member function + pointers that that thread runs. + - threads_started == the number of threads that have been spawned to run + thread_helper but haven't ended yet. + + - dead_threads == a queue that contains all the member function pointers + for threads that are currently registered but not running + + - m_ == the mutex used to protect all our variables + - s == the signaler for m_ + !*/ + + public: + + multithreaded_object ( + ); + + virtual ~multithreaded_object ( + ) = 0; + + void clear ( + ); + + bool is_running ( + ) const; + + unsigned long number_of_threads_alive ( + ) const; + + unsigned long number_of_threads_registered ( + ) const; + + void wait ( + ) const; + + void start ( + ); + + void pause ( + ); + + void stop ( + ); + + protected: + + bool should_stop ( + ) const; + + template < + typename T + > + void register_thread ( + T& object, + void (T::*thread)() + ) + { + auto_mutex M(m_); + try + { + mfp mf; + mf.set(object,thread); + dead_threads.enqueue(mf); + if (is_running_) + start(); + } + catch (...) + { + is_running_ = false; + should_stop_ = true; + s.broadcast(); + throw; + } + } + + private: + + class raii_thread_helper + { + public: + raii_thread_helper(multithreaded_object& self_, thread_id_type id_); + ~raii_thread_helper(); + + multithreaded_object& self; + thread_id_type id; + }; + + void thread_helper( + ); + + typedef member_function_pointer<> mfp; + + rmutex m_; + rsignaler s; + map<thread_id_type,mfp,memory_manager<char>::kernel_2a>::kernel_1a thread_ids; + queue<mfp,memory_manager<char>::kernel_2a>::kernel_1a dead_threads; + + bool is_running_; + bool should_stop_; + unsigned long threads_started; + + // restricted functions + multithreaded_object(multithreaded_object&); // copy constructor + multithreaded_object& operator=(multithreaded_object&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#ifdef NO_MAKEFILE +#include "multithreaded_object_extension.cpp" +#endif + +#endif // DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ + diff --git a/ml/dlib/dlib/threads/multithreaded_object_extension_abstract.h b/ml/dlib/dlib/threads/multithreaded_object_extension_abstract.h new file mode 100644 index 000000000..e7862b78f --- /dev/null +++ b/ml/dlib/dlib/threads/multithreaded_object_extension_abstract.h @@ -0,0 +1,186 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class multithreaded_object + { + /*! + INITIAL VALUE + - is_running() == false + - number_of_threads_alive() == 0 + - number_of_threads_registered() == 0 + + WHAT THIS OBJECT REPRESENTS + This object represents a multithreaded object. It is similar to + the threaded_object except it allows you to have many threads in a + single object rather than just one. To use it you inherit from it + and register the member functions in your new class that you want + to run in their own threads by calling register_thread(). Then when + you call start() it will spawn all the registered functions + in their own threads. + !*/ + + public: + + multithreaded_object ( + ); + /*! + ensures + - #*this is properly initialized + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + virtual ~multithreaded_object ( + ) = 0; + /*! + requires + - number_of_threads_alive() == 0 + (i.e. in the destructor for the object you derive from this one you + must wait for all the threads to end.) + ensures + - all resources allocated by *this have been freed. + !*/ + + void clear( + ); + /*! + ensures + - #*this has its initial value + - blocks until all threads have terminated + throws + - std::bad_alloc or dlib::thread_error + if an exception is thrown then *this is unusable + until clear() is called and succeeds + !*/ + + bool is_running ( + ) const; + /*! + ensures + - if (number_of_threads_alive() > 0 && the threads are currently supposed to be executing) then + - returns true + - else + - returns false + !*/ + + unsigned long number_of_threads_alive ( + ) const; + /*! + ensures + - returns the number of threads that are currently alive (i.e. + the number of threads that have started but not yet terminated) + !*/ + + unsigned long number_of_threads_registered ( + ) const; + /*! + ensures + - returns the number of threads that have been registered by + calls to register_thread() + !*/ + + void wait ( + ) const; + /*! + requires + - is not called from one of this object's threads + ensures + - if (number_of_threads_alive() > 0) then + - blocks until all the threads in this object have terminated + (i.e. blocks until number_of_threads_alive() == 0) + !*/ + + void start ( + ); + /*! + ensures + - #number_of_threads_alive() == number_of_threads_registered() + - #is_running() == true + - #should_stop() == false + - all the threads registered are up and running. + throws + - std::bad_alloc or dlib::thread_error + If either of these exceptions are thrown then + #is_running() == false and should_stop() == true + !*/ + + void pause ( + ); + /*! + ensures + - #is_running() == false + !*/ + + void stop ( + ); + /*! + ensures + - #should_stop() == true + - #is_running() == false + !*/ + + protected: + + template < + typename T + > + void register_thread ( + T& object, + void (T::*thread)() + ); + /*! + requires + - (object.*thread)() forms a valid function call + - the thread function does not throw + ensures + - registers the member function pointed to by thread as one of the threads + that runs when is_running() == true + - #number_of_threads_registered() == number_of_threads_registered() + 1 + - if (is_running() == true) + - spawns this new member function in its own thread + - #number_of_threads_alive() += number_of_threads_alive() + 1 + throws + - std::bad_alloc or dlib::thread_error + If either of these exceptions are thrown then + #is_running() == false and should_stop() == true + !*/ + + bool should_stop ( + ) const; + /*! + requires + - is only called from one of the registered threads in this object + ensures + - if (is_running() == false && should_stop() == false) then + - blocks until (#is_running() == true || #should_stop() == true) + - if (this thread is supposed to terminate) then + - returns true + - else + - returns false + !*/ + + private: + + // restricted functions + multithreaded_object(multithreaded_object&); // copy constructor + multithreaded_object& operator=(multithreaded_object&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_MULTITHREADED_OBJECT_EXTENSIOn_ABSTRACT_ + diff --git a/ml/dlib/dlib/threads/parallel_for_extension.h b/ml/dlib/dlib/threads/parallel_for_extension.h new file mode 100644 index 000000000..60b64b1b4 --- /dev/null +++ b/ml/dlib/dlib/threads/parallel_for_extension.h @@ -0,0 +1,676 @@ +// Copyright (C) 2013 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_PARALLEL_FoR_Hh_ +#define DLIB_PARALLEL_FoR_Hh_ + +#include "parallel_for_extension_abstract.h" +#include "thread_pool_extension.h" +#include "../console_progress_indicator.h" +#include "async.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + namespace impl + { + + template <typename T> + class helper_parallel_for + { + public: + helper_parallel_for ( + T& obj_, + void (T::*funct_)(long) + ) : + obj(obj_), + funct(funct_) + {} + + T& obj; + void (T::*funct)(long); + + void process_block (long begin, long end) + { + for (long i = begin; i < end; ++i) + (obj.*funct)(i); + } + }; + + template <typename T> + class helper_parallel_for_funct + { + public: + helper_parallel_for_funct ( + const T& funct_ + ) : funct(funct_) {} + + const T& funct; + + void run(long i) + { + funct(i); + } + }; + + template <typename T> + class helper_parallel_for_funct2 + { + public: + helper_parallel_for_funct2 ( + const T& funct_ + ) : funct(funct_) {} + + const T& funct; + + void run(long begin, long end) + { + funct(begin, end); + } + }; + } + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long, long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + if (tp.num_threads_in_pool() != 0) + { + const long num = end-begin; + const long num_workers = static_cast<long>(tp.num_threads_in_pool()); + // How many samples to process in a single task (aim for chunks_per_thread jobs per worker) + const long block_size = std::max(1L, num/(num_workers*chunks_per_thread)); + for (long i = 0; i < num; i+=block_size) + { + tp.add_task(obj, funct, begin+i, begin+std::min(i+block_size, num)); + } + tp.wait_for_all_tasks(); + } + else + { + // Since there aren't any threads in the pool we might as well just invoke + // the function directly since that's all the thread_pool object would do. + // But doing it ourselves skips a mutex lock. + (obj.*funct)(begin, end); + } + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long, long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + thread_pool tp(num_threads); + parallel_for_blocked(tp, begin, end, obj, funct, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::helper_parallel_for_funct2<T> helper(funct); + parallel_for_blocked(tp, begin, end, helper, &impl::helper_parallel_for_funct2<T>::run, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + thread_pool tp(num_threads); + parallel_for_blocked(tp, begin, end, funct, chunks_per_thread); + } + + template <typename T> + void parallel_for_blocked ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + parallel_for_blocked(default_thread_pool(), begin, end, funct, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::helper_parallel_for<T> helper(obj, funct); + parallel_for_blocked(tp, begin, end, helper, &impl::helper_parallel_for<T>::process_block, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + thread_pool tp(num_threads); + parallel_for(tp, begin, end, obj, funct, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::helper_parallel_for_funct<T> helper(funct); + parallel_for(tp, begin, end, helper, &impl::helper_parallel_for_funct<T>::run, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + thread_pool tp(num_threads); + parallel_for(tp, begin, end, funct, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + parallel_for(default_thread_pool(), begin, end, funct, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + namespace impl + { + template <typename T> + class parfor_verbose_helper + { + public: + parfor_verbose_helper(T& obj_, void (T::*funct_)(long), long begin, long end) : + obj(obj_), funct(funct_), pbar(end-begin) + { + count = 0; + wrote_to_screen = pbar.print_status(0); + } + + ~parfor_verbose_helper() + { + if (wrote_to_screen) + std::cout << std::endl; + } + + mutable long count; + T& obj; + void (T::*funct)(long); + mutable console_progress_indicator pbar; + mutable bool wrote_to_screen; + mutex m; + + void operator()(long i) const + { + (obj.*funct)(i); + { + auto_mutex lock(m); + wrote_to_screen = pbar.print_status(++count) || wrote_to_screen; + } + } + + }; + + template <typename T> + class parfor_verbose_helper3 + { + public: + parfor_verbose_helper3(T& obj_, void (T::*funct_)(long,long), long begin, long end) : + obj(obj_), funct(funct_), pbar(end-begin) + { + count = 0; + wrote_to_screen = pbar.print_status(0); + } + + ~parfor_verbose_helper3() + { + if (wrote_to_screen) + std::cout << std::endl; + } + + mutable long count; + T& obj; + void (T::*funct)(long,long); + mutable console_progress_indicator pbar; + mutable bool wrote_to_screen; + mutex m; + + void operator()(long begin, long end) const + { + (obj.*funct)(begin, end); + { + auto_mutex lock(m); + count += end-begin; + wrote_to_screen = pbar.print_status(count) || wrote_to_screen; + } + } + }; + + template <typename T> + class parfor_verbose_helper2 + { + public: + parfor_verbose_helper2(const T& obj_, long begin, long end) : obj(obj_), pbar(end-begin) + { + count = 0; + wrote_to_screen = pbar.print_status(0); + } + + ~parfor_verbose_helper2() + { + if (wrote_to_screen) + std::cout << std::endl; + } + + mutable long count; + const T& obj; + mutable console_progress_indicator pbar; + mutable bool wrote_to_screen; + mutex m; + + void operator()(long i) const + { + obj(i); + { + auto_mutex lock(m); + wrote_to_screen = pbar.print_status(++count) || wrote_to_screen; + } + } + + void operator()(long begin, long end) const + { + obj(begin, end); + { + auto_mutex lock(m); + count += end-begin; + wrote_to_screen = pbar.print_status(count) || wrote_to_screen; + } + } + }; + } + + template <typename T> + void parallel_for_verbose ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper<T> helper(obj, funct, begin, end); + parallel_for(tp, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper<T> helper(obj, funct, begin, end); + parallel_for(num_threads, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for(tp, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for(num_threads, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for(begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long,long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper3<T> helper(obj, funct, begin, end); + parallel_for_blocked(tp, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long,long), + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper3<T> helper(obj, funct, begin, end); + parallel_for_blocked(num_threads, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for_blocked(tp, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for_blocked(num_threads, begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ) + { + // make sure requires clause is not broken + DLIB_ASSERT(begin <= end && chunks_per_thread > 0, + "\t void parallel_for_blocked_verbose()" + << "\n\t Invalid inputs were given to this function" + << "\n\t begin: " << begin + << "\n\t end: " << end + << "\n\t chunks_per_thread: " << chunks_per_thread + ); + + impl::parfor_verbose_helper2<T> helper(funct, begin, end); + parallel_for_blocked(begin, end, helper, chunks_per_thread); + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_PARALLEL_FoR_Hh_ + diff --git a/ml/dlib/dlib/threads/parallel_for_extension_abstract.h b/ml/dlib/dlib/threads/parallel_for_extension_abstract.h new file mode 100644 index 000000000..ffd2e0c44 --- /dev/null +++ b/ml/dlib/dlib/threads/parallel_for_extension_abstract.h @@ -0,0 +1,469 @@ +// Copyright (C) 2013 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_PARALLEL_FoR_ABSTRACT_Hh_ +#ifdef DLIB_PARALLEL_FoR_ABSTRACT_Hh_ + +#include "thread_pool_extension_abstract.h" +#include "async_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long, long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This is a convenience function for submitting a block of jobs to a thread_pool. + In particular, given the half open range [begin, end), this function will + split the range into approximately tp.num_threads_in_pool()*chunks_per_thread + blocks, which it will then submit to the thread_pool. The given thread_pool + will then call (obj.*funct)() on each of the subranges. + - To be precise, suppose we have broken the range [begin, end) into the + following subranges: + - [begin[0], end[0]) + - [begin[1], end[1]) + - [begin[2], end[2]) + ... + - [begin[n], end[n]) + Then parallel_for_blocked() submits each of these subranges to tp for + processing such that (obj.*funct)(begin[i], end[i]) is invoked for all valid + values of i. Moreover, the subranges are non-overlapping and completely + cover the total range of [begin, end). + - This function will not perform any memory allocations or create any system + resources such as mutex objects. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long, long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + thread_pool tp(num_threads); + parallel_for_blocked(tp, begin, end, obj, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - chunks_per_thread > 0 + - begin <= end + ensures + - This is a convenience function for submitting a block of jobs to a + thread_pool. In particular, given the range [begin, end), this function will + split the range into approximately tp.num_threads_in_pool()*chunks_per_thread + blocks, which it will then submit to the thread_pool. The given thread_pool + will then call funct() on each of the subranges. + - To be precise, suppose we have broken the range [begin, end) into the + following subranges: + - [begin[0], end[0]) + - [begin[1], end[1]) + - [begin[2], end[2]) + ... + - [begin[n], end[n]) + Then parallel_for_blocked() submits each of these subranges to tp for + processing such that funct(begin[i], end[i]) is invoked for all valid values + of i. + - This function will not perform any memory allocations or create any system + resources such as mutex objects. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + thread_pool tp(num_threads); + parallel_for_blocked(tp, begin, end, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + parallel_for_blocked(default_thread_pool(), begin, end, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following function call: + parallel_for_blocked(tp, begin, end, [&](long begin_sub, long end_sub) + { + for (long i = begin_sub; i < end_sub; ++i) + (obj.*funct)(i); + }, chunks_per_thread); + - Therefore, this routine invokes (obj.*funct)(i) for all i in the range + [begin, end). However, it does so using tp.num_threads_in_pool() parallel + threads. + - This function will not perform any memory allocations or create any system + resources such as mutex objects. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + thread_pool tp(num_threads); + parallel_for(tp, begin, end, obj, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following function call: + parallel_for_blocked(tp, begin, end, [&](long begin_sub, long end_sub) + { + for (long i = begin_sub; i < end_sub; ++i) + funct(i); + }, chunks_per_thread); + - Therefore, this routine invokes funct(i) for all i in the range [begin, end). + However, it does so using tp.num_threads_in_pool() parallel threads. + - This function will not perform any memory allocations or create any system + resources such as mutex objects. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + thread_pool tp(num_threads); + parallel_for(tp, begin, end, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is equivalent to the following block of code: + parallel_for(default_thread_pool(), begin, end, funct, chunks_per_thread); + !*/ + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for() routine defined above except + that it will print messages to cout showing the progress in executing the + parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for() routine defined above except + that it will print messages to cout showing the progress in executing the + parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for() routine defined above except + that it will print messages to cout showing the progress in executing the + parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for() routine defined above except + that it will print messages to cout showing the progress in executing the + parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_verbose ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for() routine defined above except + that it will print messages to cout showing the progress in executing the + parallel for loop. + - It will also use the default_thread_pool(). + !*/ + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + thread_pool& tp, + long begin, + long end, + T& obj, + void (T::*funct)(long,long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for_blocked() routine defined + above except that it will print messages to cout showing the progress in + executing the parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + unsigned long num_threads, + long begin, + long end, + T& obj, + void (T::*funct)(long,long), + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for_blocked() routine defined + above except that it will print messages to cout showing the progress in + executing the parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + thread_pool& tp, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for_blocked() routine defined + above except that it will print messages to cout showing the progress in + executing the parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + unsigned long num_threads, + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for_blocked() routine defined + above except that it will print messages to cout showing the progress in + executing the parallel for loop. + !*/ + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void parallel_for_blocked_verbose ( + long begin, + long end, + const T& funct, + long chunks_per_thread = 8 + ); + /*! + requires + - begin <= end + - chunks_per_thread > 0 + ensures + - This function is identical to the parallel_for_blocked() routine defined + above except that it will print messages to cout showing the progress in + executing the parallel for loop. + - It will also use the default_thread_pool() + !*/ + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_PARALLEL_FoR_ABSTRACT_Hh_ + + diff --git a/ml/dlib/dlib/threads/posix.h b/ml/dlib/dlib/threads/posix.h new file mode 100644 index 000000000..7226743e1 --- /dev/null +++ b/ml/dlib/dlib/threads/posix.h @@ -0,0 +1,6 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEl_1_ +#include "threads_kernel_2.h" +#endif + diff --git a/ml/dlib/dlib/threads/read_write_mutex_extension.h b/ml/dlib/dlib/threads/read_write_mutex_extension.h new file mode 100644 index 000000000..20e5d5ed8 --- /dev/null +++ b/ml/dlib/dlib/threads/read_write_mutex_extension.h @@ -0,0 +1,177 @@ +// Copyright (C) 2010 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_READ_WRITE_MUTEX_EXTENSIOn_ +#define DLIB_READ_WRITE_MUTEX_EXTENSIOn_ + +#include "threads_kernel.h" +#include "read_write_mutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class read_write_mutex + { + /*! + INITIAL VALUE + - max_locks == defined by constructor + - available_locks == max_locks + - write_lock_in_progress == false + - write_lock_active == false + + CONVENTION + - Each time someone gets a read only lock they take one of the "available locks" + and each write lock takes all possible locks (i.e. max_locks). The number of + available locks is recorded in available_locks. Any time you try to lock this + object and there aren't available locks you have to wait. + + - max_locks == max_readonly_locks() + + - if (some thread is on the process of obtaining a write lock) then + - write_lock_in_progress == true + - else + - write_lock_in_progress == false + + - if (some thread currently has a write lock on this mutex) then + - write_lock_active == true + - else + - write_lock_active == false + !*/ + + public: + + read_write_mutex ( + ) : s(m), + max_locks(0xFFFFFFFF), + available_locks(max_locks), + write_lock_in_progress(false), + write_lock_active(false) + {} + + explicit read_write_mutex ( + unsigned long max_locks_ + ) : s(m), + max_locks(max_locks_), + available_locks(max_locks_), + write_lock_in_progress(false), + write_lock_active(false) + { + // make sure requires clause is not broken + DLIB_ASSERT(max_locks > 0, + "\t read_write_mutex::read_write_mutex(max_locks)" + << "\n\t You must give a non-zero value for max_locks" + << "\n\t this: " << this + ); + } + + ~read_write_mutex ( + ) + {} + + void lock ( + ) const + { + m.lock(); + + // If another write lock is already in progress then wait for it to finish + // before we start trying to grab all the available locks. This way we + // don't end up fighting over the locks. + while (write_lock_in_progress) + s.wait(); + + // grab the right to perform a write lock + write_lock_in_progress = true; + + // now start grabbing all the locks + unsigned long locks_obtained = available_locks; + available_locks = 0; + while (locks_obtained != max_locks) + { + s.wait(); + locks_obtained += available_locks; + available_locks = 0; + } + + write_lock_in_progress = false; + write_lock_active = true; + + m.unlock(); + } + + void unlock ( + ) const + { + m.lock(); + + // only do something if there really was a lock in place + if (write_lock_active) + { + available_locks = max_locks; + write_lock_active = false; + s.broadcast(); + } + + m.unlock(); + } + + void lock_readonly ( + ) const + { + m.lock(); + + while (available_locks == 0) + s.wait(); + + --available_locks; + + m.unlock(); + } + + void unlock_readonly ( + ) const + { + m.lock(); + + // If this condition is false then it means there are no more readonly locks + // to free. So we don't do anything. + if (available_locks != max_locks && !write_lock_active) + { + ++available_locks; + + // only perform broadcast when there is another thread that might be listening + if (available_locks == 1 || write_lock_in_progress) + { + s.broadcast(); + } + } + + m.unlock(); + } + + unsigned long max_readonly_locks ( + ) const + { + return max_locks; + } + + private: + mutex m; + signaler s; + const unsigned long max_locks; + mutable unsigned long available_locks; + mutable bool write_lock_in_progress; + mutable bool write_lock_active; + + // restricted functions + read_write_mutex(read_write_mutex&); // copy constructor + read_write_mutex& operator=(read_write_mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_READ_WRITE_MUTEX_EXTENSIOn_ + + diff --git a/ml/dlib/dlib/threads/read_write_mutex_extension_abstract.h b/ml/dlib/dlib/threads/read_write_mutex_extension_abstract.h new file mode 100644 index 000000000..18672b057 --- /dev/null +++ b/ml/dlib/dlib/threads/read_write_mutex_extension_abstract.h @@ -0,0 +1,146 @@ +// Copyright (C) 2010 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_READWRITE_MUTEX_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_READWRITE_MUTEX_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class read_write_mutex + { + /*! + INITIAL VALUE + read_write_mutex is in the fully unlocked state + + WHAT THIS OBJECT REPRESENTS + This object represents a mutex intended to be used for synchronous + thread control of shared data. When a thread wants to access some + shared data it locks out other threads by calling lock() and calls + unlock() when it is finished. + + This mutex also has the additional ability to distinguish between + a lock for the purposes of modifying some shared data, a write lock, + and a lock for the purposes of only reading shared data, a readonly + lock. The lock() and unlock() functions are used for write locks while + the lock_readonly() and unlock_readonly() are for readonly locks. + + The difference between a readonly and write lock can be understood as + follows. The read_write_mutex will allow many threads to obtain simultaneous + readonly locks but will only allow a single thread to obtain a write lock. + Moreover, while the write lock is obtained no other threads are allowed + to have readonly locks. + !*/ + public: + + read_write_mutex ( + ); + /*! + ensures + - #*this is properly initialized + - max_readonly_locks() == 0xFFFFFFFF + (i.e. about 4 billion) + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the read_write_mutex. + !*/ + + explicit read_write_mutex ( + unsigned long max_locks + ); + /*! + requires + - max_locks > 0 + ensures + - #*this is properly initialized + - max_readonly_locks() == max_locks + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the read_write_mutex. + !*/ + + ~read_write_mutex ( + ); + /*! + requires + - *this is not locked + ensures + - all resources allocated by *this have been freed + !*/ + + void lock ( + ) const; + /*! + requires + - The thread calling this function does not have any kind of lock on this + object + ensures + - if (there is any kind of lock on *this) then + - the calling thread is put to sleep until a write lock becomes available. + Once available, a write lock is obtained on this mutex and this function + terminates. + - else + - a write lock is obtained on this mutex and the calling thread is not put to sleep + !*/ + + void unlock ( + ) const; + /*! + ensures + - if (there is a write lock on *this) then + - #*this is unlocked (i.e. other threads may now lock this object) + - else + - the call to unlock() has no effect + !*/ + + unsigned long max_readonly_locks ( + ) const; + /*! + ensures + - returns the maximum number of concurrent readonly locks this object will allow. + !*/ + + void lock_readonly ( + ) const; + /*! + requires + - The thread calling this function does not already have a write + lock on this object + ensures + - if (there is a write lock on *this or there are no free readonly locks) then + - the calling thread is put to sleep until there is no longer a write lock + and a free readonly lock is available. Once this is the case, a readonly + lock is obtained and this function terminates. + - else + - a readonly lock is obtained on *this and the calling thread is not put + to sleep. Note that multiple readonly locks can be obtained at once. + !*/ + + void unlock_readonly ( + ) const; + /*! + ensures + - if (there is a readonly lock on *this) then + - one readonly lock is removed from *this. + - else + - the call to unlock_readonly() has no effect. + !*/ + + private: + // restricted functions + read_write_mutex(read_write_mutex&); // copy constructor + read_write_mutex& operator=(read_write_mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_READWRITE_MUTEX_EXTENSIOn_ABSTRACT_ + + diff --git a/ml/dlib/dlib/threads/rmutex_extension.h b/ml/dlib/dlib/threads/rmutex_extension.h new file mode 100644 index 000000000..b7bf998be --- /dev/null +++ b/ml/dlib/dlib/threads/rmutex_extension.h @@ -0,0 +1,109 @@ +// Copyright (C) 2005 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_RMUTEX_EXTENSIOn_ +#define DLIB_RMUTEX_EXTENSIOn_ + +#include "threads_kernel.h" +#include "rmutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class rmutex + { + /*! + INITIAL VALUE + count == 0 + thread_id == 0 + + CONVENTION + - count == lock_count() + + - if (no thread currently has a lock on this mutex) then + - count == 0 + - else + - count == the number of times the thread that owns this mutex has + called lock() + - thread_id == the id of this thread. + !*/ + public: + + rmutex ( + ) : s(m), + thread_id(0), + count(0) + {} + + ~rmutex ( + ) + {} + + unsigned long lock_count ( + ) const + { + return count; + } + + void lock ( + unsigned long times = 1 + ) const + { + const thread_id_type current_thread_id = get_thread_id(); + m.lock(); + if (thread_id == current_thread_id) + { + // we already own this mutex in this case + count += times; + } + else + { + // wait for our turn to claim this rmutex + while (count != 0) + s.wait(); + + count = times; + thread_id = current_thread_id; + } + m.unlock(); + } + + void unlock ( + unsigned long times = 1 + ) const + { + const thread_id_type current_thread_id = get_thread_id(); + m.lock(); + if (thread_id == current_thread_id) + { + if (count <= times) + { + count = 0; + s.signal(); + } + else + { + count -= times; + } + } + m.unlock(); + } + + private: + mutex m; + signaler s; + mutable thread_id_type thread_id; + mutable unsigned long count; + + // restricted functions + rmutex(rmutex&); // copy constructor + rmutex& operator=(rmutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_RMUTEX_EXTENSIOn_ + diff --git a/ml/dlib/dlib/threads/rmutex_extension_abstract.h b/ml/dlib/dlib/threads/rmutex_extension_abstract.h new file mode 100644 index 000000000..144dbf4d7 --- /dev/null +++ b/ml/dlib/dlib/threads/rmutex_extension_abstract.h @@ -0,0 +1,107 @@ +// Copyright (C) 2005 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_RMUTEX_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_RMUTEX_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class rmutex + { + /*! + INITIAL VALUE + rmutex is in the unlocked state + + WHAT THIS OBJECT REPRESENTS + This object represents a recursive mutex intended to be used for synchronous + thread control of shared data. When a thread wants to access some + shared data it locks out other threads by calling lock() and calls + unlock() when it is finished. + + The difference between this and the normal mutex object is that it is safe to + call lock() from a thread that already has a lock on this mutex. Doing + so just increments a counter but otherwise has no effect on the mutex. + Note that unlock() must be called for each call to lock() to release the + mutex. + !*/ + public: + + rmutex ( + ); + /*! + ensures + - #*this is properly initialized + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the rmutex. + !*/ + + ~rmutex ( + ); + /*! + requires + - *this is not locked + ensures + - all resources allocated by *this have been freed + !*/ + + unsigned long lock_count ( + ) const; + /*! + requires + - the calling thread has a lock on this mutex + ensures + - returns the number of times the thread has called lock() + !*/ + + void lock ( + unsigned long times = 1 + ) const; + /*! + ensures + - if (*this is currently locked by another thread) then + - the thread that called lock() on *this is put to sleep until + it becomes available. + - #lock_count() == times + - if (*this is currently unlocked) then + - #*this becomes locked and the current thread is NOT put to sleep + but now "owns" #*this + - #lock_count() == times + - if (*this is locked and owned by the current thread) then + - the calling thread retains its lock on *this and isn't put to sleep. + - #lock_count() == lock_count() + times + !*/ + + void unlock ( + unsigned long times = 1 + ) const; + /*! + ensures + - if (*this is currently locked and owned by the thread calling unlock) then + - if (lock_count() <= times ) then + - #*this is unlocked (i.e. other threads may now lock this object) + - else + - #*this will remain locked + - #lock_count() == lock_count() - times + - else + - the call to unlock() has no effect + !*/ + + + private: + // restricted functions + rmutex(rmutex&); // copy constructor + rmutex& operator=(rmutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_RMUTEX_EXTENSIOn_ABSTRACT_ + diff --git a/ml/dlib/dlib/threads/rsignaler_extension.h b/ml/dlib/dlib/threads/rsignaler_extension.h new file mode 100644 index 000000000..bfb5a7ecb --- /dev/null +++ b/ml/dlib/dlib/threads/rsignaler_extension.h @@ -0,0 +1,90 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_RSIGNALER_EXTENSIOn_ +#define DLIB_RSIGNALER_EXTENSIOn_ + +#include "rsignaler_extension_abstract.h" +#include "rmutex_extension.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class rsignaler + { + public: + rsignaler ( + const rmutex& associated_mutex + ) : + assoc_mutex(associated_mutex), + s(m) + {} + + ~rsignaler ( + ) + {} + + void wait ( + ) const + { + m.lock(); + const unsigned long lock_count = assoc_mutex.lock_count(); + assoc_mutex.unlock(lock_count); + s.wait(); + m.unlock(); + assoc_mutex.lock(lock_count); + } + + bool wait_or_timeout ( + unsigned long milliseconds + ) const + { + m.lock(); + const unsigned long lock_count = assoc_mutex.lock_count(); + assoc_mutex.unlock(lock_count); + bool res = s.wait_or_timeout(milliseconds); + m.unlock(); + assoc_mutex.lock(lock_count); + return res; + } + + void signal ( + ) const + { + m.lock(); + s.signal(); + m.unlock(); + } + + void broadcast ( + ) const + { + m.lock(); + s.broadcast(); + m.unlock(); + } + + const rmutex& get_mutex ( + ) const { return assoc_mutex; } + + private: + + const rmutex& assoc_mutex; + mutex m; + signaler s; + + + // restricted functions + rsignaler(rsignaler&); // copy constructor + rsignaler& operator=(rsignaler&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_RSIGNALER_EXTENSIOn_ + + + diff --git a/ml/dlib/dlib/threads/rsignaler_extension_abstract.h b/ml/dlib/dlib/threads/rsignaler_extension_abstract.h new file mode 100644 index 000000000..ae5f450d7 --- /dev/null +++ b/ml/dlib/dlib/threads/rsignaler_extension_abstract.h @@ -0,0 +1,123 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_RSIGNALER_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_RSIGNALER_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" +#include "rmutex_extension_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class rsignaler + { + /*! + WHAT THIS OBJECT REPRESENTS + This object represents an event signaling system for threads. It gives + a thread the ability to wake up other threads that are waiting for a + particular signal. + + Each rsignaler object is associated with one and only one rmutex object. + More than one rsignaler object may be associated with a single rmutex + but a signaler object may only be associated with a single rmutex. + + NOTE: + You must guard against spurious wakeups. This means that a thread + might return from a call to wait even if no other thread called + signal. This is rare but must be guarded against. + + Also note that this object is identical to the signaler object + except that it works with rmutex objects rather than mutex objects. + !*/ + + public: + + rsignaler ( + const rmutex& associated_mutex + ); + /*! + ensures + - #*this is properly initialized + - #get_mutex() == associated_mutex + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the signaler. + !*/ + + + ~rsignaler ( + ); + /*! + ensures + - all resources allocated by *this have been freed + !*/ + + void wait ( + ) const; + /*! + requires + - get_mutex() is locked and owned by the calling thread + ensures + - atomically unlocks get_mutex() and blocks the calling thread + - calling thread may wake if another thread calls signal() or broadcast() + on *this + - when wait() returns the calling thread again has a lock on get_mutex() + !*/ + + bool wait_or_timeout ( + unsigned long milliseconds + ) const; + /*! + requires + - get_mutex() is locked and owned by the calling thread + ensures + - atomically unlocks get_mutex() and blocks the calling thread + - calling thread may wake if another thread calls signal() or broadcast() + on *this + - after the specified number of milliseconds has elapsed the calling thread + will wake once get_mutex() is free + - when wait returns the calling thread again has a lock on get_mutex() + + - returns false if the call to wait_or_timeout timed out + - returns true if the call did not time out + !*/ + + void signal ( + ) const; + /*! + ensures + - if (at least one thread is waiting on *this) then + - at least one of the waiting threads will wake + !*/ + + void broadcast ( + ) const; + /*! + ensures + - any and all threads waiting on *this will wake + !*/ + + const rmutex& get_mutex ( + ) const; + /*! + ensures + - returns a const reference to the rmutex associated with *this + !*/ + + + private: + // restricted functions + rsignaler(rsignaler&); // copy constructor + rsignaler& operator=(rsignaler&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_RSIGNALER_EXTENSIOn_ABSTRACT_ + + diff --git a/ml/dlib/dlib/threads/thread_function_extension.h b/ml/dlib/dlib/threads/thread_function_extension.h new file mode 100644 index 000000000..7ecdd6520 --- /dev/null +++ b/ml/dlib/dlib/threads/thread_function_extension.h @@ -0,0 +1,215 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_FUNCTIOn_ +#define DLIB_THREAD_FUNCTIOn_ + +#include <memory> + +#include "thread_function_extension_abstract.h" +#include "threads_kernel.h" +#include "auto_mutex_extension.h" +#include "threaded_object_extension.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class thread_function : private threaded_object + { + + class base_funct + { + public: + virtual void go() = 0; + virtual ~base_funct() {} + }; + + template <typename F, typename T1, typename T2, typename T3, typename T4> + class super_funct_4 : public base_funct + { + public: + super_funct_4 ( F funct, T1 arg1, T2 arg2, T3 arg3, T4 arg4) : + f(funct), + a1(arg1), + a2(arg2), + a3(arg3), + a4(arg4) + { + } + + void go() { f(a1, a2, a3, a4); } + + + F f; + T1 a1; + T2 a2; + T3 a3; + T4 a4; + }; + + template <typename F, typename T1, typename T2, typename T3> + class super_funct_3 : public base_funct + { + public: + super_funct_3 ( F funct, T1 arg1, T2 arg2, T3 arg3): + f(funct), + a1(arg1), + a2(arg2), + a3(arg3) + { + } + + void go() { f(a1, a2, a3); } + + + F f; + T1 a1; + T2 a2; + T3 a3; + }; + + template <typename F, typename T1, typename T2> + class super_funct_2 : public base_funct + { + public: + super_funct_2 ( F funct, T1 arg1, T2 arg2) : + f(funct), + a1(arg1), + a2(arg2) + { + } + + void go() { f(a1, a2); } + + + F f; + T1 a1; + T2 a2; + }; + + template <typename F, typename T> + class super_funct_1 : public base_funct + { + public: + super_funct_1 ( F funct, T arg) : f(funct), a(arg) + { + } + + void go() { f(a); } + + + F f; + T a; + }; + + template <typename F> + class super_funct_0 : public base_funct + { + public: + super_funct_0 ( F funct) : f(funct) + { + } + + void go() { f(); } + + F f; + }; + + public: + + template <typename F> + thread_function ( + F funct + ) + { + f.reset(new super_funct_0<F>(funct)); + start(); + } + + template <typename F, typename T> + thread_function ( + F funct, + T arg + ) + { + f.reset(new super_funct_1<F,T>(funct,arg)); + start(); + } + + template <typename F, typename T1, typename T2> + thread_function ( + F funct, + T1 arg1, + T2 arg2 + ) + { + f.reset(new super_funct_2<F,T1,T2>(funct, arg1, arg2)); + start(); + } + + template <typename F, typename T1, typename T2, typename T3> + thread_function ( + F funct, + T1 arg1, + T2 arg2, + T3 arg3 + ) + { + f.reset(new super_funct_3<F,T1,T2,T3>(funct, arg1, arg2, arg3)); + start(); + } + + template <typename F, typename T1, typename T2, typename T3, typename T4> + thread_function ( + F funct, + T1 arg1, + T2 arg2, + T3 arg3, + T4 arg4 + ) + { + f.reset(new super_funct_4<F,T1,T2,T3,T4>(funct, arg1, arg2, arg3, arg4)); + start(); + } + + ~thread_function ( + ) + { + threaded_object::wait(); + } + + bool is_alive ( + ) const + { + return threaded_object::is_alive(); + } + + void wait ( + ) const + { + threaded_object::wait(); + } + + private: + + void thread () + { + f->go(); + } + + std::unique_ptr<base_funct> f; + + // restricted functions + thread_function(thread_function&); // copy constructor + thread_function& operator=(thread_function&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREAD_FUNCTIOn_ + + + diff --git a/ml/dlib/dlib/threads/thread_function_extension_abstract.h b/ml/dlib/dlib/threads/thread_function_extension_abstract.h new file mode 100644 index 000000000..65ea998ac --- /dev/null +++ b/ml/dlib/dlib/threads/thread_function_extension_abstract.h @@ -0,0 +1,146 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREAD_FUNCTIOn_ABSTRACT_ +#ifdef DLIB_THREAD_FUNCTIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class thread_function + { + /*! + WHAT THIS OBJECT REPRESENTS + This object represents a thread on a global C++ function or function + object. That is, it allows you to run a function in its own thread. + !*/ + public: + + template <typename F> + thread_function ( + F funct + ); + /*! + ensures + - #*this is properly initialized + - the function funct has been started in its own thread + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + template <typename F, typename T1> + thread_function ( + F funct, + T1 arg1 + ); + /*! + ensures + - #*this is properly initialized + - A thread has been created and it will call funct(arg1) + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + template <typename F, typename T1, typename T2> + thread_function ( + F funct, + T1 arg1, + T2 arg2 + ); + /*! + ensures + - #*this is properly initialized + - A thread has been created and it will call funct(arg1, arg2) + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + template <typename F, typename T1, typename T2, typename T3> + thread_function ( + F funct, + T1 arg1, + T2 arg2, + T3 arg3 + ); + /*! + ensures + - #*this is properly initialized + - A thread has been created and it will call funct(arg1, arg2, arg3) + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + template <typename F, typename T1, typename T2, typename T3, typename T4> + thread_function ( + F funct, + T1 arg1, + T2 arg2, + T3 arg3, + T4 arg4 + ); + /*! + ensures + - #*this is properly initialized + - A thread has been created and it will call funct(arg1, arg2, arg3, arg4) + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + ~thread_function ( + ); + /*! + ensures + - all resources allocated by *this have been freed. + - blocks until is_alive() == false + !*/ + + bool is_alive ( + ) const; + /*! + ensures + - if (this object's thread has yet to terminate) then + - returns true + - else + - returns false + !*/ + + void wait ( + ) const; + /*! + ensures + - if (is_alive() == true) then + - blocks until this object's thread terminates + !*/ + + private: + + // restricted functions + thread_function(thread_function&); // copy constructor + thread_function& operator=(thread_function&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREAD_FUNCTIOn_ABSTRACT_ + + diff --git a/ml/dlib/dlib/threads/thread_pool_extension.cpp b/ml/dlib/dlib/threads/thread_pool_extension.cpp new file mode 100644 index 000000000..00d99b910 --- /dev/null +++ b/ml/dlib/dlib/threads/thread_pool_extension.cpp @@ -0,0 +1,347 @@ +// Copyright (C) 2008 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_POOl_CPPh_ +#define DLIB_THREAD_POOl_CPPh_ + +#include "thread_pool_extension.h" +#include <memory> + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + thread_pool_implementation:: + thread_pool_implementation ( + unsigned long num_threads + ) : + task_done_signaler(m), + task_ready_signaler(m), + we_are_destructing(false) + { + tasks.resize(num_threads); + threads.resize(num_threads); + for (unsigned long i = 0; i < num_threads; ++i) + { + threads[i] = std::thread([&](){this->thread();}); + } + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool_implementation:: + shutdown_pool ( + ) + { + { + auto_mutex M(m); + + // first wait for all pending tasks to finish + bool found_task = true; + while (found_task) + { + found_task = false; + for (unsigned long i = 0; i < tasks.size(); ++i) + { + // If task bucket i has a task that is currently supposed to be processed + if (tasks[i].is_empty() == false) + { + found_task = true; + break; + } + } + + if (found_task) + task_done_signaler.wait(); + } + + // now tell the threads to kill themselves + we_are_destructing = true; + task_ready_signaler.broadcast(); + } + + // wait for all threads to terminate + for (auto& t : threads) + t.join(); + threads.clear(); + + // Throw any unhandled exceptions. Since shutdown_pool() is only called in the + // destructor this will kill the program. + for (auto&& task : tasks) + task.propagate_exception(); + } + +// ---------------------------------------------------------------------------------------- + + thread_pool_implementation:: + ~thread_pool_implementation() + { + shutdown_pool(); + } + +// ---------------------------------------------------------------------------------------- + + unsigned long thread_pool_implementation:: + num_threads_in_pool ( + ) const + { + auto_mutex M(m); + return tasks.size(); + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool_implementation:: + wait_for_task ( + uint64 task_id + ) const + { + auto_mutex M(m); + if (tasks.size() != 0) + { + const unsigned long idx = task_id_to_index(task_id); + while (tasks[idx].task_id == task_id) + task_done_signaler.wait(); + + for (auto&& task : tasks) + task.propagate_exception(); + } + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool_implementation:: + wait_for_all_tasks ( + ) const + { + const thread_id_type thread_id = get_thread_id(); + + auto_mutex M(m); + bool found_task = true; + while (found_task) + { + found_task = false; + for (unsigned long i = 0; i < tasks.size(); ++i) + { + // If task bucket i has a task that is currently supposed to be processed + // and it originated from the calling thread + if (tasks[i].is_empty() == false && tasks[i].thread_id == thread_id) + { + found_task = true; + break; + } + } + + if (found_task) + task_done_signaler.wait(); + } + + // throw any exceptions generated by the tasks + for (auto&& task : tasks) + task.propagate_exception(); + } + +// ---------------------------------------------------------------------------------------- + + bool thread_pool_implementation:: + is_worker_thread ( + const thread_id_type id + ) const + { + for (unsigned long i = 0; i < worker_thread_ids.size(); ++i) + { + if (worker_thread_ids[i] == id) + return true; + } + + // if there aren't any threads in the pool then we consider all threads + // to be worker threads + if (tasks.size() == 0) + return true; + else + return false; + } + +// ---------------------------------------------------------------------------------------- + + void thread_pool_implementation:: + thread ( + ) + { + { + // save the id of this worker thread into worker_thread_ids + auto_mutex M(m); + thread_id_type id = get_thread_id(); + worker_thread_ids.push_back(id); + } + + task_state_type task; + while (we_are_destructing == false) + { + long idx = 0; + + // wait for a task to do + { auto_mutex M(m); + while ( (idx = find_ready_task()) == -1 && we_are_destructing == false) + task_ready_signaler.wait(); + + if (we_are_destructing) + break; + + tasks[idx].is_being_processed = true; + task = tasks[idx]; + } + + std::exception_ptr eptr = nullptr; + try + { + // now do the task + if (task.bfp) + task.bfp(); + else if (task.mfp0) + task.mfp0(); + else if (task.mfp1) + task.mfp1(task.arg1); + else if (task.mfp2) + task.mfp2(task.arg1, task.arg2); + } + catch(...) + { + eptr = std::current_exception(); + } + + // Now let others know that we finished the task. We do this + // by clearing out the state of this task + { auto_mutex M(m); + tasks[idx].is_being_processed = false; + tasks[idx].task_id = 0; + tasks[idx].bfp.clear(); + tasks[idx].mfp0.clear(); + tasks[idx].mfp1.clear(); + tasks[idx].mfp2.clear(); + tasks[idx].arg1 = 0; + tasks[idx].arg2 = 0; + tasks[idx].eptr = eptr; + task_done_signaler.broadcast(); + } + + } + } + +// ---------------------------------------------------------------------------------------- + + long thread_pool_implementation:: + find_empty_task_slot ( + ) const + { + for (auto&& task : tasks) + task.propagate_exception(); + + for (unsigned long i = 0; i < tasks.size(); ++i) + { + if (tasks[i].is_empty()) + return i; + } + + return -1; + } + +// ---------------------------------------------------------------------------------------- + + long thread_pool_implementation:: + find_ready_task ( + ) const + { + for (unsigned long i = 0; i < tasks.size(); ++i) + { + if (tasks[i].is_ready()) + return i; + } + + return -1; + } + +// ---------------------------------------------------------------------------------------- + + uint64 thread_pool_implementation:: + make_next_task_id ( + long idx + ) + { + uint64 id = tasks[idx].next_task_id * tasks.size() + idx; + tasks[idx].next_task_id += 1; + return id; + } + +// ---------------------------------------------------------------------------------------- + + unsigned long thread_pool_implementation:: + task_id_to_index ( + uint64 id + ) const + { + return static_cast<unsigned long>(id%tasks.size()); + } + +// ---------------------------------------------------------------------------------------- + + uint64 thread_pool_implementation:: + add_task_internal ( + const bfp_type& bfp, + std::shared_ptr<function_object_copy>& item + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + M.unlock(); + bfp(); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].bfp = bfp; + tasks[idx].function_copy.swap(item); + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + +// ---------------------------------------------------------------------------------------- + + bool thread_pool_implementation:: + is_task_thread ( + ) const + { + auto_mutex M(m); + return is_worker_thread(get_thread_id()); + } + +// ---------------------------------------------------------------------------------------- + +} + + +#endif // DLIB_THREAD_POOl_CPPh_ + diff --git a/ml/dlib/dlib/threads/thread_pool_extension.h b/ml/dlib/dlib/threads/thread_pool_extension.h new file mode 100644 index 000000000..bc2e1782c --- /dev/null +++ b/ml/dlib/dlib/threads/thread_pool_extension.h @@ -0,0 +1,1392 @@ +// Copyright (C) 2008 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_POOl_Hh_ +#define DLIB_THREAD_POOl_Hh_ + +#include <exception> +#include <memory> +#include <thread> + +#include "thread_pool_extension_abstract.h" +#include "multithreaded_object_extension.h" +#include "../member_function_pointer.h" +#include "../bound_function_pointer.h" +#include "threads_kernel.h" +#include "auto_mutex_extension.h" +#include "../uintn.h" +#include "../array.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class thread_pool_implementation; + + template < + typename T + > + class future + { + /*! + INITIAL VALUE + - task_id == 0 + - tp.get() == 0 + + CONVENTION + - is_ready() == (tp.get() == 0) + - get() == var + + - if (tp.get() != 0) + - tp == a pointer to the thread_pool_implementation that is using this future object + - task_id == the task id of the task in the thread pool tp that is using + this future object. + !*/ + public: + + future ( + ) : task_id(0) {} + + future ( + const T& item + ) : task_id(0), var(item) {} + + future ( + const future& item + ) :task_id(0), var(item.get()) {} + + ~future ( + ) { wait(); } + + future& operator=( + const T& item + ) { get() = item; return *this; } + + future& operator=( + const future& item + ) { get() = item.get(); return *this; } + + operator T& ( + ) { return get(); } + + operator const T& ( + ) const { return get(); } + + T& get ( + ) { wait(); return var; } + + const T& get ( + ) const { wait(); return var; } + + bool is_ready ( + ) const { return tp.get() == 0; } + + private: + + friend class thread_pool; + + inline void wait () const; + + mutable uint64 task_id; + mutable std::shared_ptr<thread_pool_implementation> tp; + + T var; + }; + +// ---------------------------------------------------------------------------------------- + + template <typename T> + inline void swap ( + future<T>& a, + future<T>& b + ) { dlib::exchange(a.get(), b.get()); } + // Note that dlib::exchange() just calls std::swap. I'm only using it because + // this works around some bugs in certain compilers. + +// ---------------------------------------------------------------------------------------- + + template <typename T> bool operator== (const future<T>& a, const future<T>& b) { return a.get() == b.get(); } + template <typename T> bool operator!= (const future<T>& a, const future<T>& b) { return a.get() != b.get(); } + template <typename T> bool operator<= (const future<T>& a, const future<T>& b) { return a.get() <= b.get(); } + template <typename T> bool operator>= (const future<T>& a, const future<T>& b) { return a.get() >= b.get(); } + template <typename T> bool operator< (const future<T>& a, const future<T>& b) { return a.get() < b.get(); } + template <typename T> bool operator> (const future<T>& a, const future<T>& b) { return a.get() > b.get(); } + + template <typename T> bool operator== (const future<T>& a, const T& b) { return a.get() == b; } + template <typename T> bool operator== (const T& a, const future<T>& b) { return a == b.get(); } + template <typename T> bool operator!= (const future<T>& a, const T& b) { return a.get() != b; } + template <typename T> bool operator!= (const T& a, const future<T>& b) { return a != b.get(); } + template <typename T> bool operator<= (const future<T>& a, const T& b) { return a.get() <= b; } + template <typename T> bool operator<= (const T& a, const future<T>& b) { return a <= b.get(); } + template <typename T> bool operator>= (const future<T>& a, const T& b) { return a.get() >= b; } + template <typename T> bool operator>= (const T& a, const future<T>& b) { return a >= b.get(); } + template <typename T> bool operator< (const future<T>& a, const T& b) { return a.get() < b; } + template <typename T> bool operator< (const T& a, const future<T>& b) { return a < b.get(); } + template <typename T> bool operator> (const future<T>& a, const T& b) { return a.get() > b; } + template <typename T> bool operator> (const T& a, const future<T>& b) { return a > b.get(); } + +// ---------------------------------------------------------------------------------------- + + class thread_pool_implementation + { + /*! + CONVENTION + - num_threads_in_pool() == tasks.size() + - if (the destructor has been called) then + - we_are_destructing == true + - else + - we_are_destructing == false + + - is_task_thread() == is_worker_thread(get_thread_id()) + + - m == the mutex used to protect everything in this object + - worker_thread_ids == an array that contains the thread ids for + all the threads in the thread pool + !*/ + typedef bound_function_pointer::kernel_1a_c bfp_type; + + friend class thread_pool; + explicit thread_pool_implementation ( + unsigned long num_threads + ); + + public: + ~thread_pool_implementation( + ); + + void wait_for_task ( + uint64 task_id + ) const; + + unsigned long num_threads_in_pool ( + ) const; + + void wait_for_all_tasks ( + ) const; + + bool is_task_thread ( + ) const; + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)() + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + M.unlock(); + (obj.*funct)(); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp0.set(obj,funct); + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long), + long arg1 + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + M.unlock(); + (obj.*funct)(arg1); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp1.set(obj,funct); + tasks[idx].arg1 = arg1; + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long,long), + long arg1, + long arg2 + ) + { + auto_mutex M(m); + const thread_id_type my_thread_id = get_thread_id(); + + // find a thread that isn't doing anything + long idx = find_empty_task_slot(); + if (idx == -1 && is_worker_thread(my_thread_id)) + { + // this function is being called from within a worker thread and there + // aren't any other worker threads free so just perform the task right + // here + + M.unlock(); + (obj.*funct)(arg1, arg2); + + // return a task id that is both non-zero and also one + // that is never normally returned. This way calls + // to wait_for_task() will never block given this id. + return 1; + } + + // wait until there is a thread that isn't doing anything + while (idx == -1) + { + task_done_signaler.wait(); + idx = find_empty_task_slot(); + } + + tasks[idx].thread_id = my_thread_id; + tasks[idx].task_id = make_next_task_id(idx); + tasks[idx].mfp2.set(obj,funct); + tasks[idx].arg1 = arg1; + tasks[idx].arg2 = arg2; + + task_ready_signaler.signal(); + + return tasks[idx].task_id; + } + + struct function_object_copy + { + virtual ~function_object_copy(){} + }; + + template <typename T> + struct function_object_copy_instance : function_object_copy + { + function_object_copy_instance(const T& item_) : item(item_) {} + T item; + virtual ~function_object_copy_instance(){} + }; + + uint64 add_task_internal ( + const bfp_type& bfp, + std::shared_ptr<function_object_copy>& item + ); + /*! + ensures + - adds a task to call the given bfp object. + - swaps item into the internal task object which will have a lifetime + at least as long as the running task. + - returns the task id for this new task + !*/ + + uint64 add_task_internal ( + const bfp_type& bfp + ) { std::shared_ptr<function_object_copy> temp; return add_task_internal(bfp, temp); } + /*! + ensures + - adds a task to call the given bfp object. + - returns the task id for this new task + !*/ + + void shutdown_pool ( + ); + /*! + ensures + - causes all threads to terminate and blocks the + caller until this happens. + !*/ + + private: + + bool is_worker_thread ( + const thread_id_type id + ) const; + /*! + requires + - m is locked + ensures + - if (thread with given id is one of the thread pool's worker threads or num_threads_in_pool() == 0) then + - returns true + - else + - returns false + !*/ + + void thread ( + ); + /*! + this is the function that executes the threads in the thread pool + !*/ + + long find_empty_task_slot ( + ) const; + /*! + requires + - m is locked + ensures + - if (there is currently a empty task slot) then + - returns the index of that task slot in tasks + - there is a task slot + - else + - returns -1 + !*/ + + long find_ready_task ( + ) const; + /*! + requires + - m is locked + ensures + - if (there is currently a task to do) then + - returns the index of that task in tasks + - else + - returns -1 + !*/ + + uint64 make_next_task_id ( + long idx + ); + /*! + requires + - m is locked + - 0 <= idx < tasks.size() + ensures + - returns the next index to be used for tasks that are placed in + tasks[idx] + !*/ + + unsigned long task_id_to_index ( + uint64 id + ) const; + /*! + requires + - m is locked + - num_threads_in_pool() != 0 + ensures + - returns the index in tasks corresponding to the given id + !*/ + + struct task_state_type + { + task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0), eptr(nullptr) {} + + bool is_ready () const + /*! + ensures + - if (is_empty() == false && no thread is currently processing this task) then + - returns true + - else + - returns false + !*/ + { + return !is_being_processed && !is_empty(); + } + + bool is_empty () const + /*! + ensures + - if (this task state is empty. i.e. it doesn't contain a task to be processed) then + - returns true + - else + - returns false + !*/ + { + return task_id == 0; + } + + bool is_being_processed; // true when a thread is working on this task + uint64 task_id; // the id of this task. 0 means this task is empty + thread_id_type thread_id; // the id of the thread that requested this task + + uint64 next_task_id; + + long arg1; + long arg2; + + member_function_pointer<> mfp0; + member_function_pointer<long> mfp1; + member_function_pointer<long,long> mfp2; + bfp_type bfp; + + std::shared_ptr<function_object_copy> function_copy; + mutable std::exception_ptr eptr; // non-null if the task threw an exception + + void propagate_exception() const + { + if (eptr) + { + auto tmp = eptr; + eptr = nullptr; + std::rethrow_exception(tmp); + } + } + + }; + + array<task_state_type> tasks; + array<thread_id_type> worker_thread_ids; + + mutex m; + signaler task_done_signaler; + signaler task_ready_signaler; + bool we_are_destructing; + + std::vector<std::thread> threads; + + // restricted functions + thread_pool_implementation(thread_pool_implementation&); // copy constructor + thread_pool_implementation& operator=(thread_pool_implementation&); // assignment operator + + }; + + +// ---------------------------------------------------------------------------------------- + + class thread_pool + { + /*! + This object is just a shell that holds a std::shared_ptr + to the real thread_pool_implementation object. The reason for doing + it this way is so that we can allow any mixture of destruction orders + between thread_pool objects and futures. Whoever gets destroyed + last cleans up the thread_pool_implementation resources. + !*/ + typedef bound_function_pointer::kernel_1a_c bfp_type; + + public: + explicit thread_pool ( + unsigned long num_threads + ) + { + impl.reset(new thread_pool_implementation(num_threads)); + } + + ~thread_pool ( + ) + { + try + { + impl->shutdown_pool(); + } + catch (std::exception& e) + { + std::cerr << "An unhandled exception was inside a dlib::thread_pool when it was destructed." << std::endl; + std::cerr << "It's what string is: \n" << e.what() << std::endl; + using namespace std; + assert(false); + abort(); + } + catch (...) + { + std::cerr << "An unhandled exception was inside a dlib::thread_pool when it was destructed." << std::endl; + using namespace std; + assert(false); + abort(); + } + } + + void wait_for_task ( + uint64 task_id + ) const { impl->wait_for_task(task_id); } + + unsigned long num_threads_in_pool ( + ) const { return impl->num_threads_in_pool(); } + + void wait_for_all_tasks ( + ) const { impl->wait_for_all_tasks(); } + + bool is_task_thread ( + ) const { return impl->is_task_thread(); } + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)() + ) + { + return impl->add_task(obj, funct); + } + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long), + long arg1 + ) + { + return impl->add_task(obj, funct, arg1); + } + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long,long), + long arg1, + long arg2 + ) + { + return impl->add_task(obj, funct, arg1, arg2); + } + + // -------------------- + + template <typename F> + uint64 add_task ( + F& function_object + ) + { + COMPILE_TIME_ASSERT(is_function<F>::value == false); + COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false); + + bfp_type temp; + temp.set(function_object); + uint64 id = impl->add_task_internal(temp); + + return id; + } + + template <typename F> + uint64 add_task_by_value ( + const F& function_object + ) + { + thread_pool_implementation::function_object_copy_instance<F>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + + bfp_type temp; + temp.set(ptr->item); + uint64 id = impl->add_task_internal(temp, function_copy); + + return id; + } + + template <typename T> + uint64 add_task ( + const T& obj, + void (T::*funct)() const + ) + { + bfp_type temp; + temp.set(obj,funct); + uint64 id = impl->add_task_internal(temp); + + return id; + } + + template <typename T> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)() const + ) + { + thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item,funct); + uint64 id = impl->add_task_internal(temp, function_copy); + + return id; + } + + template <typename T> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)() + ) + { + thread_pool_implementation::function_object_copy_instance<T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item,funct); + uint64 id = impl->add_task_internal(temp, function_copy); + + return id; + } + + uint64 add_task ( + void (*funct)() + ) + { + bfp_type temp; + temp.set(funct); + uint64 id = impl->add_task_internal(temp); + + return id; + } + + // -------------------- + + template <typename F, typename A1> + uint64 add_task ( + F& function_object, + future<A1>& arg1 + ) + { + COMPILE_TIME_ASSERT(is_function<F>::value == false); + COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false); + + bfp_type temp; + temp.set(function_object,arg1.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + template <typename F, typename A1> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1 + ) + { + thread_pool_implementation::function_object_copy_instance<F>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, arg1.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1> + uint64 add_task ( + T& obj, + void (T::*funct)(T1), + future<A1>& arg1 + ) + { + bfp_type temp; + temp.set(obj,funct,arg1.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1), + future<A1>& arg1 + ) + { + thread_pool_implementation::function_object_copy_instance<T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item,funct,arg1.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + + template <typename T, typename T1, typename A1> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1) const, + future<A1>& arg1 + ) + { + bfp_type temp; + temp.set(obj,funct,arg1.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1) const, + future<A1>& arg1 + ) + { + thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item,funct,arg1.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + template <typename T1, typename A1> + uint64 add_task ( + void (*funct)(T1), + future<A1>& arg1 + ) + { + bfp_type temp; + temp.set(funct,arg1.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + return id; + } + + // -------------------- + + template <typename F, typename A1, typename A2> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2 + ) + { + COMPILE_TIME_ASSERT(is_function<F>::value == false); + COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false); + + bfp_type temp; + temp.set(function_object, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename F, typename A1, typename A2> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2 + ) + { + thread_pool_implementation::function_object_copy_instance<F>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ) + { + thread_pool_implementation::function_object_copy_instance<T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2) const, + future<A1>& arg1, + future<A2>& arg2 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2) const, + future<A1>& arg1, + future<A2>& arg2 + ) + { + thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + template <typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + void (*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ) + { + bfp_type temp; + temp.set(funct, arg1.get(), arg2.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + return id; + } + + // -------------------- + + template <typename F, typename A1, typename A2, typename A3> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + COMPILE_TIME_ASSERT(is_function<F>::value == false); + COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false); + + bfp_type temp; + temp.set(function_object, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename F, typename A1, typename A2, typename A3> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + thread_pool_implementation::function_object_copy_instance<F>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + thread_pool_implementation::function_object_copy_instance<T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2,T3) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + template <typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + void (*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ) + { + bfp_type temp; + temp.set(funct, arg1.get(), arg2.get(), arg3.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + return id; + } + + // -------------------- + + template <typename F, typename A1, typename A2, typename A3, typename A4> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + COMPILE_TIME_ASSERT(is_function<F>::value == false); + COMPILE_TIME_ASSERT(is_pointer_type<F>::value == false); + + bfp_type temp; + temp.set(function_object, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename F, typename A1, typename A2, typename A3, typename A4> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + thread_pool_implementation::function_object_copy_instance<F>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<F>(function_object); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the future to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + thread_pool_implementation::function_object_copy_instance<T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + bfp_type temp; + temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + thread_pool_implementation::function_object_copy_instance<const T>* ptr = 0; + ptr = new thread_pool_implementation::function_object_copy_instance<const T>(obj); + std::shared_ptr<thread_pool_implementation::function_object_copy> function_copy(ptr); + + bfp_type temp; + temp.set(ptr->item, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp, function_copy); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + template <typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + void (*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ) + { + bfp_type temp; + temp.set(funct, arg1.get(), arg2.get(), arg3.get(), arg4.get()); + uint64 id = impl->add_task_internal(temp); + + // tie the futures to this task + arg1.task_id = id; + arg1.tp = impl; + arg2.task_id = id; + arg2.tp = impl; + arg3.task_id = id; + arg3.tp = impl; + arg4.task_id = id; + arg4.tp = impl; + return id; + } + + private: + + std::shared_ptr<thread_pool_implementation> impl; + + // restricted functions + thread_pool(thread_pool&); // copy constructor + thread_pool& operator=(thread_pool&); // assignment operator + + }; + + +// ---------------------------------------------------------------------------------------- + + template <typename T> + void future<T>:: + wait ( + ) const + { + if (tp) + { + tp->wait_for_task(task_id); + tp.reset(); + task_id = 0; + } + } + +} + +// ---------------------------------------------------------------------------------------- + +#ifdef NO_MAKEFILE +#include "thread_pool_extension.cpp" +#endif + +#endif // DLIB_THREAD_POOl_Hh_ + + diff --git a/ml/dlib/dlib/threads/thread_pool_extension_abstract.h b/ml/dlib/dlib/threads/thread_pool_extension_abstract.h new file mode 100644 index 000000000..ba54a7546 --- /dev/null +++ b/ml/dlib/dlib/threads/thread_pool_extension_abstract.h @@ -0,0 +1,842 @@ +// Copyright (C) 2008 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREAD_POOl_ABSTRACT_Hh_ +#ifdef DLIB_THREAD_POOl_ABSTRACT_Hh_ + +#include "threads_kernel_abstract.h" +#include "../uintn.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + class future + { + /*! + INITIAL VALUE + - is_ready() == true + + WHAT THIS OBJECT REPRESENTS + This object represents a container that allows you to safely pass objects + into the tasks performed by the thread_pool object defined below. An + example will make it clear: + + // Suppose you have a global function defined as follows + void add (int a, int b, int& result) { result = a + b; } + + // Also suppose you have a thread_pool named tp defined somewhere. + // Then you could do the following. + future<int> a, b, result; + a = 3; + b = 4; + // this function call causes another thread to execute a call to the add() function + // and passes in the int objects contained in a, b, and result + tp.add_task(add,a,b,result); + // This line will wait for the task in the thread pool to finish and then print the + // value in the result integer. So it will print a 7. + cout << result << endl; + !*/ + + public: + future ( + ); + /*! + ensures + - The object of type T contained in this future has + an initial value for its type. + - #is_ready() == true + !*/ + + future ( + const T& item + ); + /*! + ensures + - #get() == item + - #is_ready() == true + !*/ + + future ( + const future& item + ); + /*! + ensures + - if (item.is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to the item future has finished. + - #is_ready() == true + - #item.is_ready() == true + - #get() == item.get() + !*/ + + ~future ( + ); + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + !*/ + + bool is_ready ( + ) const; + /*! + ensures + - if (the value of this future may not yet be ready to be accessed because it + is in use by a task in a thread_pool) then + - returns false + - else + - returns true + !*/ + + future& operator=( + const T& item + ); + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + - #is_ready() == true + - #get() == item + - returns *this + !*/ + + future& operator=( + const future& item + ); + /*! + ensures + - if (is_ready() == false || item.is_ready() == false) then + - the call to this function blocks until the threads processing the tasks related + to this future and the item future have finished. + - #is_ready() == true + - #item.is_ready() == true + - #get() == item.get() + - returns *this + !*/ + + operator T& ( + ); + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + - #is_ready() == true + - returns get() + !*/ + + operator const T& ( + ) const; + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + - #is_ready() == true + - returns get() + !*/ + + T& get ( + ); + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + - #is_ready() == true + - returns a non-const reference to the object of type T contained inside this future + !*/ + + const T& get ( + ) const; + /*! + ensures + - if (is_ready() == false) then + - the call to this function blocks until the thread processing the task related + to this future has finished. + - #is_ready() == true + - returns a const reference to the object of type T contained inside this future + !*/ + + }; + +// ---------------------------------------------------------------------------------------- + + template <typename T> + inline void swap ( + future<T>& a, + future<T>& b + ) { std::swap(a.get(), b.get()); } + /*! + provides a global swap function + !*/ + +// ---------------------------------------------------------------------------------------- + + +// The future object comes with overloads for all the usual comparison operators. + + template <typename T> bool operator== (const future<T>& a, const future<T>& b) { return a.get() == b.get(); } + template <typename T> bool operator!= (const future<T>& a, const future<T>& b) { return a.get() != b.get(); } + template <typename T> bool operator<= (const future<T>& a, const future<T>& b) { return a.get() <= b.get(); } + template <typename T> bool operator>= (const future<T>& a, const future<T>& b) { return a.get() >= b.get(); } + template <typename T> bool operator< (const future<T>& a, const future<T>& b) { return a.get() < b.get(); } + template <typename T> bool operator> (const future<T>& a, const future<T>& b) { return a.get() > b.get(); } + + template <typename T> bool operator== (const future<T>& a, const T& b) { return a.get() == b; } + template <typename T> bool operator== (const T& a, const future<T>& b) { return a == b.get(); } + template <typename T> bool operator!= (const future<T>& a, const T& b) { return a.get() != b; } + template <typename T> bool operator!= (const T& a, const future<T>& b) { return a != b.get(); } + template <typename T> bool operator<= (const future<T>& a, const T& b) { return a.get() <= b; } + template <typename T> bool operator<= (const T& a, const future<T>& b) { return a <= b.get(); } + template <typename T> bool operator>= (const future<T>& a, const T& b) { return a.get() >= b; } + template <typename T> bool operator>= (const T& a, const future<T>& b) { return a >= b.get(); } + template <typename T> bool operator< (const future<T>& a, const T& b) { return a.get() < b; } + template <typename T> bool operator< (const T& a, const future<T>& b) { return a < b.get(); } + template <typename T> bool operator> (const future<T>& a, const T& b) { return a.get() > b; } + template <typename T> bool operator> (const T& a, const future<T>& b) { return a > b.get(); } + +// ---------------------------------------------------------------------------------------- + + class thread_pool + { + /*! + WHAT THIS OBJECT REPRESENTS + This object represents a fixed size group of threads which you can + submit tasks to and then wait for those tasks to be completed. + + Note that setting the number of threads to 0 is a valid way to + use this object. It causes it to not contain any threads + at all. When tasks are submitted to the object in this mode + the tasks are processed within the calling thread. So in this + mode any thread that calls add_task() is considered to be + a thread_pool thread capable of executing tasks. + + This object is also implemented such that no memory allocations occur + after the thread_pool has been constructed so long as the user doesn't + call any of the add_task_by_value() routines. The future object also + doesn't perform any memory allocations or contain any system resources + such as mutex objects. + + EXCEPTIONS + Note that if an exception is thrown inside a task thread and is not caught + then the exception will be trapped inside the thread pool and rethrown at a + later time when someone calls one of the add task or wait member functions + of the thread pool. This allows exceptions to propagate out of task threads + and into the calling code where they can be handled. + !*/ + + public: + explicit thread_pool ( + unsigned long num_threads + ); + /*! + ensures + - #num_threads_in_pool() == num_threads + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + ~thread_pool( + ); + /*! + ensures + - blocks until all tasks in the pool have finished. + - If one of the threads has generated an exception but it hasn't yet been + rethrown to the caller (e.g. by calling wait_for_all_tasks()) then the + program will be terminated. So make sure you handle all the possible + exceptions from your tasks. + !*/ + + bool is_task_thread ( + ) const; + /*! + ensures + - if (the thread calling this function is one of the threads in this + thread pool or num_threads_in_pool() == 0) then + - returns true + - else + - returns false + !*/ + + unsigned long num_threads_in_pool ( + ) const; + /*! + ensures + - returns the number of threads contained in this thread pool. That is, returns + the maximum number of tasks that this object will process concurrently. + !*/ + + template <typename F> + uint64 add_task_by_value ( + const F& function_object + ); + /*! + requires + - function_object() is a valid expression + ensures + - makes a copy of function_object, call it FCOPY. + - if (is_task_thread() == true and there aren't any free threads available) then + - calls FCOPY() within the calling thread and returns when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls FCOPY(). + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)() + ); + /*! + requires + - funct == a valid member function pointer for class T + - obj will not go out of scope until after the task has completed (i.e. + this function passes obj to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (obj.*funct)() within the calling thread and returns + when it finishes. + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.*funct)() + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)() + ); + /*! + requires + - funct == a valid member function pointer for class T + ensures + - makes a copy of obj, call it OBJ_COPY. + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (OBJ_COPY.*funct)() within the calling thread and returns + when it finishes. + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (OBJ_COPY.*funct)(). + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long), + long arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + - obj will not go out of scope until after the task has completed (i.e. + this function passes obj to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (obj.*funct)(arg1) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.*funct)(arg1) + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T> + uint64 add_task ( + T& obj, + void (T::*funct)(long,long), + long arg1, + long arg2 + ); + /*! + requires + - funct == a valid member function pointer for class T + - obj will not go out of scope until after the task has completed (i.e. + this function passes obj to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (obj.*funct)(arg1,arg2) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.*funct)(arg1,arg2) + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + void wait_for_task ( + uint64 task_id + ) const; + /*! + ensures + - if (there is currently a task with the given id being executed in the thread pool) then + - the call to this function blocks until the task with the given id is complete + - else + - the call to this function returns immediately + !*/ + + void wait_for_all_tasks ( + ) const; + /*! + ensures + - the call to this function blocks until all tasks which were submitted + to the thread pool by the thread that is calling this function have + finished. + !*/ + + // -------------------- + + template <typename F, typename A1> + uint64 add_task ( + F& function_object, + future<A1>& arg1 + ); + /*! + requires + - function_object(arg1.get()) is a valid expression + (i.e. The A1 type stored in the future must be a type that can be passed into the given function object) + - function_object will not go out of scope until after the task has completed (i.e. + this function passes function_object to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls function_object(arg1.get()) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls function_object(arg1.get()). + - #arg1.is_ready() == false + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename F, typename A1> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1 + ); + /*! + requires + - function_object(arg1.get()) is a valid expression + (i.e. The A1 type stored in the future must be a type that can be passed into the given function object) + ensures + - makes a copy of function_object, call it FCOPY. + - if (is_task_thread() == true and there aren't any free threads available) then + - calls FCOPY(arg1.get()) within the calling thread and returns when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls FCOPY(arg1.get()). + - #arg1.is_ready() == false + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T, typename T1, typename A1> + uint64 add_task ( + T& obj, + void (T::*funct)(T1), + future<A1>& arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + - (obj.*funct)(arg1.get()) must be a valid expression. + (i.e. The A1 type stored in the future must be a type that can be passed into the given function) + - obj will not go out of scope until after the task has completed (i.e. + this function passes obj to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (obj.*funct)(arg1.get()) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.*funct)(arg1.get()). + - #arg1.is_ready() == false + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T, typename T1, typename A1> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1), + future<A1>& arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + - (obj.*funct)(arg1.get()) must be a valid expression. + (i.e. The A1 type stored in the future must be a type that can be passed into the given function) + ensures + - makes a copy of obj, call it OBJ_COPY. + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (OBJ_COPY.*funct)(arg1.get()) within the calling thread and returns + when it finishes. + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (OBJ_COPY.*funct)(arg1.get()). + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T, typename T1, typename A1> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1) const, + future<A1>& arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + - (obj.*funct)(arg1.get()) must be a valid expression. + (i.e. The A1 type stored in the future must be a type that can be passed into the given function) + - obj will not go out of scope until after the task has completed (i.e. + this function passes obj to the task by reference. If you want to avoid + this restriction then use add_task_by_value()) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (obj.*funct)(arg1.get()) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (obj.*funct)(arg1.get()). + - #arg1.is_ready() == false + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T, typename T1, typename A1> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1) const, + future<A1>& arg1 + ); + /*! + requires + - funct == a valid member function pointer for class T + - (obj.*funct)(arg1.get()) must be a valid expression. + (i.e. The A1 type stored in the future must be a type that can be passed into the given function) + ensures + - makes a copy of obj, call it OBJ_COPY. + - if (is_task_thread() == true and there aren't any free threads available) then + - calls (OBJ_COPY.*funct)(arg1.get()) within the calling thread and returns + when it finishes. + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls (OBJ_COPY.*funct)(arg1.get()). + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + template <typename T1, typename A1> + uint64 add_task ( + void (*funct)(T1), + future<A1>& arg1 + ); + /*! + requires + - funct == a valid function pointer + - (funct)(arg1.get()) must be a valid expression. + (i.e. The A1 type stored in the future must be a type that can be passed into the given function) + ensures + - if (is_task_thread() == true and there aren't any free threads available) then + - calls funct(arg1.get()) within the calling thread and returns + when it finishes + - else + - the call to this function blocks until there is a free thread in the pool + to process this new task. Once a free thread is available the task + is handed off to that thread which then calls funct(arg1.get()). + - #arg1.is_ready() == false + - returns a task id that can be used by this->wait_for_task() to wait + for the submitted task to finish. + !*/ + + // -------------------------------------------------------------------------------- + // The remainder of this class just contains overloads for add_task() and add_task_by_value() + // that take up to 4 futures (as well as 0 futures). Their behavior is identical to the above + // add_task() and add_task_by_value() functions. + // -------------------------------------------------------------------------------- + + template <typename F, typename A1, typename A2> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2 + ); + + template <typename F, typename A1, typename A2> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ); + + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2) const, + future<A1>& arg1, + future<A2>& arg2 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2) const, + future<A1>& arg1, + future<A2>& arg2 + ); + + template <typename T1, typename A1, + typename T2, typename A2> + uint64 add_task ( + void (*funct)(T1,T2), + future<A1>& arg1, + future<A2>& arg2 + ); + + // -------------------- + + template <typename F, typename A1, typename A2, typename A3> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename F, typename A1, typename A2, typename A3> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2,T3) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + template <typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3> + uint64 add_task ( + void (*funct)(T1,T2,T3), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3 + ); + + // -------------------- + + template <typename F, typename A1, typename A2, typename A3, typename A4> + uint64 add_task ( + F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename F, typename A1, typename A2, typename A3, typename A4> + uint64 add_task_by_value ( + const F& function_object, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + T& obj, + void (T::*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename T, typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)(T1,T2,T3,T4) const, + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + template <typename T1, typename A1, + typename T2, typename A2, + typename T3, typename A3, + typename T4, typename A4> + uint64 add_task ( + void (*funct)(T1,T2,T3,T4), + future<A1>& arg1, + future<A2>& arg2, + future<A3>& arg3, + future<A4>& arg4 + ); + + // -------------------- + + template <typename F> + uint64 add_task ( + F& function_object + ); + + template <typename T> + uint64 add_task ( + const T& obj, + void (T::*funct)() const, + ); + + template <typename T> + uint64 add_task_by_value ( + const T& obj, + void (T::*funct)() const + ); + + uint64 add_task ( + void (*funct)() + ); + + // -------------------- + + private: + + // restricted functions + thread_pool(thread_pool&); // copy constructor + thread_pool& operator=(thread_pool&); // assignment operator + }; + +} + +// ---------------------------------------------------------------------------------------- + +#endif // DLIB_THREAD_POOl_ABSTRACT_Hh_ + + + diff --git a/ml/dlib/dlib/threads/thread_specific_data_extension.h b/ml/dlib/dlib/threads/thread_specific_data_extension.h new file mode 100644 index 000000000..0b5339200 --- /dev/null +++ b/ml/dlib/dlib/threads/thread_specific_data_extension.h @@ -0,0 +1,141 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ +#define DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ + +#include "thread_specific_data_extension_abstract.h" +#include "threads_kernel_abstract.h" +#include "../binary_search_tree.h" +#include "auto_mutex_extension.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + class thread_specific_data + { + /*! + CONVENTION + - for all valid ID: + (*items[ID]) == pointer to the data for thread with id ID + !*/ + public: + + thread_specific_data ( + ) + { + thread_end_handler_calls_left = 0; + } + + ~thread_specific_data ( + ) + { + // We should only call the unregister_thread_end_handler function if there are + // some outstanding callbacks we expect to get. Otherwise lets avoid calling it + // since the dlib state that maintains the registered thread end handlers may have + // been destructed already (since the program might be in the process of terminating). + bool call_unregister = false; + m.lock(); + if (thread_end_handler_calls_left > 0) + call_unregister = true; + m.unlock(); + + if (call_unregister) + unregister_thread_end_handler(const_cast<thread_specific_data&>(*this),&thread_specific_data::thread_end_handler); + + auto_mutex M(m); + items.reset(); + while (items.move_next()) + { + delete items.element().value(); + } + } + + inline T& data ( + ) { return get_data(); } + + inline const T& data ( + ) const { return get_data(); } + + private: + + T& get_data ( + ) const + { + thread_id_type id = get_thread_id(); + auto_mutex M(m); + + T** item = items[id]; + if (item) + { + return **item; + } + else + { + // register an end handler for this thread so long as it is a dlib created thread. + T* new_item = new T; + + bool in_tree = false; + try + { + T* temp_item = new_item; + thread_id_type temp_id = id; + items.add(temp_id,temp_item); + in_tree = true; + + if (is_dlib_thread(id)) + { + register_thread_end_handler(const_cast<thread_specific_data&>(*this),&thread_specific_data::thread_end_handler); + ++thread_end_handler_calls_left; + } + } + catch (...) + { + if (in_tree) + { + items.destroy(id); + } + delete new_item; + throw; + } + + return *new_item; + } + } + + void thread_end_handler ( + ) + { + const thread_id_type id = get_thread_id(); + thread_id_type junk = 0; + T* item = 0; + auto_mutex M(m); + --thread_end_handler_calls_left; + if (items[id]) + { + items.remove(id,junk,item); + delete item; + } + } + + mutable typename binary_search_tree<thread_id_type,T*>::kernel_2a items; + mutex m; + mutable long thread_end_handler_calls_left; + + // restricted functions + thread_specific_data(thread_specific_data&); // copy constructor + thread_specific_data& operator=(thread_specific_data&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ + + + diff --git a/ml/dlib/dlib/threads/thread_specific_data_extension_abstract.h b/ml/dlib/dlib/threads/thread_specific_data_extension_abstract.h new file mode 100644 index 000000000..03fb9ddaa --- /dev/null +++ b/ml/dlib/dlib/threads/thread_specific_data_extension_abstract.h @@ -0,0 +1,87 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + class thread_specific_data + { + /*! + WHAT THIS OBJECT REPRESENTS + This object represents a container of thread specific data. When + a thread calls the data() member function it gets a reference to a T object + that is specific to its own thread. Each subsequent call to data() from that + thread returns the same instance. Also note that when a thread ends + the instance of its data() object gets destroyed and freed (if the thread + was created by the dlib library). So any pointers or references to the object + will be invalid after the thread has ended. + !*/ + public: + + thread_specific_data ( + ); + /*! + ensures + - #*this is properly initialized + !*/ + + ~thread_specific_data ( + ); + /*! + ensures + - all resources allocated by *this have been freed. This includes + all the thread specific data returned by the data() functions. + !*/ + + T& data ( + ); + /*! + ensures + - if (the calling thread has NOT called this->data() before) then + - constructs an instance of T that is specific to the calling + thread. + - returns a reference to the T instance that was constructed for + the calling thread. + throws + - std::bad_alloc or any exception thrown by T's constructor + If an exception is thrown then the call to data() will have + no effect on *this. + !*/ + + const T& data ( + ) const; + /*! + ensures + - if (the calling thread has NOT called this->data() before) then + - constructs an instance of T that is specific to the calling + thread. + - returns a const reference to the T instance that was constructed for + the calling thread. + throws + - std::bad_alloc or any exception thrown by T's constructor + If an exception is thrown then the call to data() will have + no effect on *this. + !*/ + + private: + // restricted functions + thread_specific_data(thread_specific_data&); // copy constructor + thread_specific_data& operator=(thread_specific_data&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREAD_SPECIFIC_DATA_EXTENSIOn_ABSTRACT_ + + diff --git a/ml/dlib/dlib/threads/threaded_object_extension.cpp b/ml/dlib/dlib/threads/threaded_object_extension.cpp new file mode 100644 index 000000000..a7326c11d --- /dev/null +++ b/ml/dlib/dlib/threads/threaded_object_extension.cpp @@ -0,0 +1,290 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADED_OBJECT_EXTENSIOn_CPP +#define DLIB_THREADED_OBJECT_EXTENSIOn_CPP + +#include "threaded_object_extension.h" +#include "create_new_thread_extension.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + threaded_object:: + threaded_object ( + ): + s(m_), + id1(0), + is_running_(false), + is_alive_(false), + should_stop_(false), + id_valid(false) + { + } + +// ---------------------------------------------------------------------------------------- + + threaded_object:: + ~threaded_object ( + ) + { + try + { + DLIB_ASSERT(is_alive() == false, + "\tthreaded_object::~threaded_object()" + << "\n\tYou have let a threaded object destruct itself before terminating its thread" + << "\n\tthis: " << this + ); + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + assert(false); + abort(); + } + } + +// ---------------------------------------------------------------------------------------- + + bool threaded_object:: + is_running ( + ) const + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tbool threaded_object::is_running()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + return is_running_; + } + +// ---------------------------------------------------------------------------------------- + + bool threaded_object:: + is_alive ( + ) const + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tbool threaded_object::is_alive()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + return is_alive_; + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + wait ( + ) const + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::wait()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + while (is_alive_) + s.wait(); + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + start ( + ) + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::start()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + if (is_alive_ == false) + { + if (create_new_thread<threaded_object,&threaded_object::thread_helper>(*this) == false) + { + is_running_ = false; + throw thread_error(); + } + } + is_alive_ = true; + is_running_ = true; + should_stop_ = false; + s.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + restart ( + ) + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::restart()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + if (is_alive_ == false) + { + if (create_new_thread<threaded_object,&threaded_object::thread_helper>(*this) == false) + { + is_running_ = false; + throw thread_error(); + } + should_respawn_ = false; + } + else + { + should_respawn_ = true; + } + is_alive_ = true; + is_running_ = true; + should_stop_ = false; + s.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + set_respawn ( + ) + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::set_respawn()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + should_respawn_ = true; + } + +// ---------------------------------------------------------------------------------------- + + bool threaded_object:: + should_respawn ( + ) const + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tbool threaded_object::should_respawn()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + return should_respawn_; + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + pause ( + ) + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::pause()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + is_running_ = false; + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + stop ( + ) + { + auto_mutex M(m_); + + DLIB_ASSERT(id1 != get_thread_id() || id_valid == false, + "\tvoid threaded_object::stop()" + << "\n\tYou can NOT call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + + should_stop_ = true; + is_running_ = false; + should_respawn_ = false; + s.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + bool threaded_object:: + should_stop ( + ) const + { + auto_mutex M(m_); + DLIB_ASSERT(is_alive_ && id1 == get_thread_id() && id_valid == true, + "\tbool threaded_object::should_stop()" + << "\n\tYou can only call this function from the thread that executes threaded_object::thread" + << "\n\tthis: " << this + ); + while (is_running_ == false && should_stop_ == false) + s.wait(); + return should_stop_; + } + +// ---------------------------------------------------------------------------------------- + + void threaded_object:: + thread_helper( + ) + { +#ifdef ENABLE_ASSERTS + id1 = get_thread_id(); + id_valid = true; +#endif + while (true) + { + m_.lock(); + should_respawn_ = false; + m_.unlock(); + + thread(); + + auto_mutex M(m_); + + if (should_respawn_) + continue; + +#ifdef ENABLE_ASSERTS + id_valid = false; +#endif + + is_alive_ = false; + is_running_ = false; + should_stop_ = false; + s.broadcast(); + + return; + } + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREADED_OBJECT_EXTENSIOn_CPP + diff --git a/ml/dlib/dlib/threads/threaded_object_extension.h b/ml/dlib/dlib/threads/threaded_object_extension.h new file mode 100644 index 000000000..dcf00daea --- /dev/null +++ b/ml/dlib/dlib/threads/threaded_object_extension.h @@ -0,0 +1,123 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADED_OBJECT_EXTENSIOn_ +#define DLIB_THREADED_OBJECT_EXTENSIOn_ + +#include "threaded_object_extension_abstract.h" +#include "threads_kernel.h" +#include "auto_mutex_extension.h" +#include "../algs.h" +#include "../assert.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class threaded_object + { + /*! + INITIAL VALUE + - is_running_ == false + - is_alive_ == false + - should_stop_ == false + - should_respawn_ == false + +#ifdef ENABLE_ASSERTS + - id_valid == false + - id1 == get_main_thread_id() +#endif + + CONVENTION + - is_running() == is_running_ + - is_alive() == is_alive_ + - should_stop() == should_stop_ + - should_respawn() == should_respawn_ + + +#ifdef ENABLE_ASSERTS + - if (when thread() is executing) then + - id1 == the id of the running thread + - id_valid == true + - else + - id1 == an undefined value + - id_valid == false +#endif + + - m_ == the mutex used to protect all our variables + - s == the signaler for m_ + !*/ + + public: + + threaded_object ( + ); + + virtual ~threaded_object ( + ); + + bool is_running ( + ) const; + + bool is_alive ( + ) const; + + void wait ( + ) const; + + void start ( + ); + + void restart ( + ); + + void set_respawn ( + ); + + bool should_respawn ( + ) const; + + void pause ( + ); + + void stop ( + ); + + protected: + + bool should_stop ( + ) const; + + private: + + void thread_helper( + ); + + virtual void thread ( + ) = 0; + + mutex m_; + signaler s; + thread_id_type id1; + bool is_running_; + bool is_alive_; + bool should_stop_; + bool should_respawn_; + bool id_valid; + + // restricted functions + threaded_object(threaded_object&); // copy constructor + threaded_object& operator=(threaded_object&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#ifdef NO_MAKEFILE +#include "threaded_object_extension.cpp" +#endif + +#endif // DLIB_THREADED_OBJECT_EXTENSIOn_ + + diff --git a/ml/dlib/dlib/threads/threaded_object_extension_abstract.h b/ml/dlib/dlib/threads/threaded_object_extension_abstract.h new file mode 100644 index 000000000..32a8fbc31 --- /dev/null +++ b/ml/dlib/dlib/threads/threaded_object_extension_abstract.h @@ -0,0 +1,199 @@ +// Copyright (C) 2007 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREADED_OBJECT_EXTENSIOn_ABSTRACT_ +#ifdef DLIB_THREADED_OBJECT_EXTENSIOn_ABSTRACT_ + +#include "threads_kernel_abstract.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + class threaded_object + { + /*! + INITIAL VALUE + - is_running() == false + - is_alive() == false + - should_respawn() == false + + WHAT THIS OBJECT REPRESENTS + This object represents a simple threaded object. To use it you inherit + from it and define the thread() function. Then when you call start() + it will spawn a thread that calls this->thread(). + !*/ + public: + + threaded_object ( + ); + /*! + ensures + - #*this is properly initialized + throws + - std::bad_alloc + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create threading objects. + !*/ + + virtual ~threaded_object ( + ); + /*! + requires + - is_alive() == false + (i.e. in the destructor for the object you derive from this one you + must wait for this->thread() to end.) + ensures + - all resources allocated by *this have been freed. + !*/ + + bool is_running ( + ) const; + /*! + requires + - is not called from this->thread() + ensures + - if (is_alive() && this->thread() is currently supposed to be executing) then + - returns true + - else + - returns false + !*/ + + bool is_alive ( + ) const; + /*! + requires + - is not called from this->thread() + ensures + - if (this->thread() has been called by some thread and has yet to terminate) then + - returns true + - else + - returns false + !*/ + + void wait ( + ) const; + /*! + requires + - is not called from this->thread() + ensures + - if (is_alive() == true) then + - blocks until this->thread() terminates + !*/ + + void start ( + ); + /*! + requires + - is not called from this->thread() + ensures + - #is_alive() == true + - #is_running() == true + - #should_stop() == false + throws + - std::bad_alloc or dlib::thread_error + If either of these exceptions are thrown then + #is_alive() == false and #is_running() == false + !*/ + + void set_respawn ( + ); + /*! + requires + - is not called from this->thread() + ensures + - #should_respawn() == true + !*/ + + bool should_respawn ( + ) const; + /*! + requires + - is not called from this->thread() + ensures + - returns true if the thread will automatically restart upon termination and + false otherwise. Note that every time a thread starts it sets should_respawn() + back to false. Therefore, a single call to set_respawn() can cause at most + one respawn to occur. + !*/ + + void restart ( + ); + /*! + requires + - is not called from this->thread() + ensures + - This function atomically executes set_respawn() and start(). The precise meaning of this + is defined below. + - if (is_alive()) then + - #should_respawn() == true + - else + - #should_respawn() == false + - #is_alive() == true + - #is_running() == true + - #should_stop() == false + throws + - std::bad_alloc or dlib::thread_error + If either of these exceptions are thrown then + #is_alive() == false and #is_running() == false + !*/ + + void pause ( + ); + /*! + requires + - is not called from this->thread() + ensures + - #is_running() == false + !*/ + + void stop ( + ); + /*! + requires + - is not called from this->thread() + ensures + - #should_stop() == true + - #is_running() == false + - #should_respawn() == false + !*/ + + protected: + + bool should_stop ( + ) const; + /*! + requires + - is only called from the thread that executes this->thread() + ensures + - calls to this function block until (#is_running() == true || #should_stop() == true) + - if (this thread is supposed to terminate) then + - returns true + - else + - returns false + !*/ + + private: + + virtual void thread ( + ) = 0; + /*! + requires + - is executed in its own thread + - is only executed in one thread at a time + throws + - does not throw any exceptions + !*/ + + // restricted functions + threaded_object(threaded_object&); // copy constructor + threaded_object& operator=(threaded_object&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREADED_OBJECT_EXTENSIOn_ABSTRACT_ + diff --git a/ml/dlib/dlib/threads/threads_kernel.h b/ml/dlib/dlib/threads/threads_kernel.h new file mode 100644 index 000000000..77cb16d92 --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel.h @@ -0,0 +1,18 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADs_KERNEL_ +#define DLIB_THREADs_KERNEL_ + +#include "../platform.h" + +#ifdef WIN32 +#include "windows.h" +#endif + +#ifndef WIN32 +#include "posix.h" +#endif + +#endif // DLIB_THREADs_KERNEL_ + + diff --git a/ml/dlib/dlib/threads/threads_kernel_1.cpp b/ml/dlib/dlib/threads/threads_kernel_1.cpp new file mode 100644 index 000000000..cb36b8d3f --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_1.cpp @@ -0,0 +1,83 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEL_1_CPp_ +#define DLIB_THREADS_KERNEL_1_CPp_ + +#include "../platform.h" + +#ifdef WIN32 + +#include "threads_kernel_1.h" + +#include <process.h> + + +namespace dlib +{ + namespace threads_kernel_shared_helpers + { + + // ----------------------------------------------------------------------------------- + + struct info + { + void* param; + void (*funct)(void*); + }; + + // ----------------------------------------------------------------------------------- + + unsigned int __stdcall thread_starter ( + void* param + ) + { + info* alloc_p = static_cast<info*>(param); + info p = *alloc_p; + delete alloc_p; + + p.funct(p.param); + return 0; + } + + // ----------------------------------------------------------------------------------- + + bool spawn_thread ( + void (*funct)(void*), + void* param + ) + { + info* p; + try { p = new info; } + catch (...) { return false; } + + p->funct = funct; + p->param = param; + + + unsigned int garbage; + + HANDLE thandle = (HANDLE)_beginthreadex (NULL,0,thread_starter,p,0,&garbage); + // make thread and add it to the pool + + // return false if _beginthreadex didn't work + if ( thandle == 0) + { + delete p; + return false; + } + + // throw away the thread handle + CloseHandle(thandle); + return true; + } + + // ----------------------------------------------------------------------------------- + + } + +} + +#endif // WIN32 + +#endif // DLIB_THREADS_KERNEL_1_CPp_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_1.h b/ml/dlib/dlib/threads/threads_kernel_1.h new file mode 100644 index 000000000..586a21b7e --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_1.h @@ -0,0 +1,158 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEl_1_ +#define DLIB_THREADS_KERNEl_1_ + +#ifdef DLIB_ISO_CPP_ONLY +#error "DLIB_ISO_CPP_ONLY is defined so you can't use this OS dependent code. Turn DLIB_ISO_CPP_ONLY off if you want to use it." +#endif + +#include "threads_kernel_abstract.h" + +#include "../windows_magic.h" +#include <windows.h> +#include "../algs.h" +#include <condition_variable> +#include <mutex> +#include <chrono> + + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + typedef DWORD thread_id_type; + + inline thread_id_type get_thread_id ( + ) + { + return GetCurrentThreadId(); + } + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // mutex object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + // forward declaration of signaler + class signaler; + + class mutex + { + public: + + mutex ( + ) + { + } + + ~mutex ( + ) { } + + void lock ( + ) const { cs.lock(); } + + void unlock ( + ) const { cs.unlock(); } + + private: + + friend class signaler; + + mutable std::mutex cs; + + // restricted functions + mutex(mutex&); // copy constructor + mutex& operator=(mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // signaler object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + class signaler + { + + public: + signaler ( + const mutex& associated_mutex + ) : + m(associated_mutex) + { + + } + + ~signaler ( + ) { } + + void wait ( + ) const + { + std::unique_lock<std::mutex> cs(m.cs, std::defer_lock); + cv.wait(cs); + } + + bool wait_or_timeout ( + unsigned long milliseconds + ) const + { + std::unique_lock<std::mutex> cs(m.cs, std::defer_lock); + auto status = cv.wait_until(cs, std::chrono::system_clock::now() + std::chrono::milliseconds(milliseconds)); + return status == std::cv_status::no_timeout; + } + + void signal ( + ) const + { + cv.notify_one(); + } + + void broadcast ( + ) const + { + cv.notify_all(); + } + + const mutex& get_mutex ( + ) const { return m; } + + private: + + mutable std::condition_variable cv; + + const mutex& m; + + // restricted functions + signaler(signaler&); // copy constructor + signaler& operator=(signaler&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + + namespace threads_kernel_shared_helpers + { + bool spawn_thread ( + void (*funct)(void*), + void* param + ); + /*! + is identical to create_new_thread() but just doesn't use any thread pooling. + !*/ + } + +// ---------------------------------------------------------------------------------------- + +} + +#include "threads_kernel_shared.h" + +#ifdef NO_MAKEFILE +#include "threads_kernel_1.cpp" +#endif + +#endif // DLIB_THREADS_KERNEl_1_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_2.cpp b/ml/dlib/dlib/threads/threads_kernel_2.cpp new file mode 100644 index 000000000..06fb80d00 --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_2.cpp @@ -0,0 +1,75 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEL_2_CPp_ +#define DLIB_THREADS_KERNEL_2_CPp_ + +#include "../platform.h" + +#ifdef POSIX + +#include "threads_kernel_2.h" + + +namespace dlib +{ + namespace threads_kernel_shared_helpers + { + + // ----------------------------------------------------------------------------------- + + struct info + { + void* param; + void (*funct)(void*); + }; + + // ----------------------------------------------------------------------------------- + + void* thread_starter ( + void* param + ) + { + info* alloc_p = static_cast<info*>(param); + info p = *alloc_p; + delete alloc_p; + + // detach self + pthread_detach(pthread_self()); + + p.funct(p.param); + return 0; + } + + // ----------------------------------------------------------------------------------- + + bool spawn_thread ( + void (*funct)(void*), + void* param + ) + { + info* p; + try { p = new info; } + catch (...) { return false; } + + p->funct = funct; + p->param = param; + + pthread_t thread_id; + if ( pthread_create (&thread_id, 0, thread_starter, p) ) + { + delete p; + return false; + } + return true; + } + + // ----------------------------------------------------------------------------------- + + } + +} + +#endif // POSIX + +#endif // DLIB_THREADS_KERNEL_2_CPp_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_2.h b/ml/dlib/dlib/threads/threads_kernel_2.h new file mode 100644 index 000000000..209142131 --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_2.h @@ -0,0 +1,180 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEl_2_ +#define DLIB_THREADS_KERNEl_2_ + +#ifdef DLIB_ISO_CPP_ONLY +#error "DLIB_ISO_CPP_ONLY is defined so you can't use this OS dependent code. Turn DLIB_ISO_CPP_ONLY off if you want to use it." +#endif + +#include "threads_kernel_abstract.h" +#include <pthread.h> +#include <errno.h> +#include <sys/time.h> +#include "../algs.h" + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + typedef pthread_t thread_id_type; + + inline thread_id_type get_thread_id ( + ) + { + return pthread_self(); + } + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // mutex object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + // forward declaration of signaler + class signaler; + + class mutex + { + // give signaler access to hMutex + friend class signaler; + public: + + mutex ( + ) + { + if (pthread_mutex_init(&myMutex,0)) + { + throw dlib::thread_error(ECREATE_MUTEX, + "in function mutex::mutex() an error occurred making the mutex" + ); + } + } + + ~mutex ( + ) { pthread_mutex_destroy(&myMutex); } + + void lock ( + ) const { pthread_mutex_lock(&myMutex); } + + void unlock ( + ) const { pthread_mutex_unlock(&myMutex); } + + private: + + mutable pthread_mutex_t myMutex; + + // restricted functions + mutex(mutex&); // copy constructor + mutex& operator=(mutex&); // assignement opertor + }; + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // signaler object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + class signaler + { + + public: + + + signaler ( + const mutex& assoc_mutex + ) : + associated_mutex(&assoc_mutex.myMutex), + m(assoc_mutex) + { + if (pthread_cond_init(&cond,0)) + { + throw dlib::thread_error(ECREATE_SIGNALER, + "in function signaler::signaler() an error occurred making the signaler" + ); + } + } + + ~signaler ( + ) { pthread_cond_destroy(&cond); } + + void wait ( + ) const + { + pthread_cond_wait(&cond,associated_mutex); + } + + bool wait_or_timeout ( + unsigned long milliseconds + ) const + { + timespec time_to_wait; + + timeval curtime; + gettimeofday(&curtime,0); + + // get the time and adjust the timespec object by the appropriate amount + time_to_wait.tv_sec = milliseconds/1000 + curtime.tv_sec; + time_to_wait.tv_nsec = curtime.tv_usec; + time_to_wait.tv_nsec *= 1000; + time_to_wait.tv_nsec += (milliseconds%1000)*1000000; + + time_to_wait.tv_sec += time_to_wait.tv_nsec/1000000000; + time_to_wait.tv_nsec = time_to_wait.tv_nsec%1000000000; + + if ( pthread_cond_timedwait(&cond,associated_mutex,&time_to_wait) == ETIMEDOUT) + { + return false; + } + else + { + return true; + } + } + + void signal ( + ) const { pthread_cond_signal(&cond); } + + void broadcast ( + ) const { pthread_cond_broadcast(&cond); } + + const mutex& get_mutex ( + ) const { return m; } + + private: + + pthread_mutex_t* const associated_mutex; + mutable pthread_cond_t cond; + const mutex& m; + + // restricted functions + signaler(signaler&); // copy constructor + signaler& operator=(signaler&); // assignement opertor + }; + +// ---------------------------------------------------------------------------------------- + + namespace threads_kernel_shared_helpers + { + bool spawn_thread ( + void (*funct)(void*), + void* param + ); + /*! + is identical to create_new_thread() but just doesn't use any thread pooling. + !*/ + } + +// ---------------------------------------------------------------------------------------- + +} + +#include "threads_kernel_shared.h" + +#ifdef NO_MAKEFILE +#include "threads_kernel_2.cpp" +#endif + +#endif // DLIB_THREADS_KERNEl_2_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_abstract.h b/ml/dlib/dlib/threads/threads_kernel_abstract.h new file mode 100644 index 000000000..d88d37dad --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_abstract.h @@ -0,0 +1,302 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#undef DLIB_THREADS_KERNEl_ABSTRACT_ +#ifdef DLIB_THREADS_KERNEl_ABSTRACT_ + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- + + /*! + THREAD POOLING + When threads end they go into a global thread pool and each waits there + for 30 seconds before timing out and having its resources returned to the + operating system. When create_new_thread() is called it first looks in the + thread pool to see if there are any threads it can snatch from the pool, if + not then it makes a new one. + + Note that whenever I say something happens when a thread "terminates" or "ends" + I mean "when it returns to the thread pool." From the client programmer point + of view a thread terminates/ends when it returns to the dlib thread pool and you + shouldn't and indeed don't need to know when it actually gets its resources + reclaimed by the operating system. + + If you want to change the timeout to a different value you can #define + DLIB_THREAD_POOL_TIMEOUT to whatever value (in milliseconds) that you like. + + EXCEPTIONS + Unless specified otherwise, nothing in this file throws exceptions. + !*/ + +// ---------------------------------------------------------------------------------------- + + thread_id_type get_thread_id ( + ); + /*! + ensures + - returns a unique id for the calling thread. Note that while the id is unique + among all currently existing threads it may have been used by a previous + thread that has terminated. + !*/ + +// ---------------------------------------------------------------------------------------- + + bool is_dlib_thread ( + thread_id_type id = get_thread_id() + ); + /*! + ensures + - if (the thread with the given id was spawned by a call to + dlib::create_new_thread) then + - returns true + - else + - returns false + !*/ + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void register_thread_end_handler ( + T& obj, + void (T::*handler)() + ); + /*! + requires + - handler == a valid member function pointer for class T + - handler does not throw + - handler does not call register_thread_end_handler() + - handler does not block + - is_dlib_thread() == true (i.e. the calling thread was spawned by dlib::create_new_thread()) + ensures + - let ID == the thread id for the thread calling register_thread_end_handler() + - (obj.*handler)() will be called when the thread with thread id ID is + terminating and it will be called from within that terminating thread. + (i.e. inside the handler function get_thread_id() == ID == the id of the + thread that is terminating. ) + - each call to this function adds another handler that will be called when + the given thread terminates. This means that if you call it a bunch of + times then you will end up registering multiple handlers (or single + handlers multiple times) that will be called when the thread ends. + throws + - std::bad_alloc + If this exception is thrown then the call to this function had no effect. + !*/ + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void unregister_thread_end_handler ( + T& obj, + void (T::*handler)() + ); + /*! + requires + - handler == a valid member function pointer for class T + ensures + - Undoes all previous calls to register_thread_end_handler(obj,handler). + So the given handler won't be called when any threads end. + throws + - std::bad_alloc + If this exception is thrown then the call to this function had no effect. + !*/ + +// ---------------------------------------------------------------------------------------- + + bool create_new_thread ( + void (*funct)(void*), + void* param + ); + /*! + ensures + - creates a new thread for the function pointed to by funct + - passes it param as its parameter. (i.e. calls funct(param) from the new thread) + - returns true upon success and false upon failure to create the new thread + !*/ + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // mutex object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + class mutex + { + /*! + INITIAL VALUE + mutex is in the unlocked state + + WHAT THIS OBJECT REPRESENTS + This object represents a mutex intended to be used for synchronous + thread control of shared data. When a thread wants to access some + shared data it locks out other threads by calling lock() and calls + unlock() when it is finished. + !*/ + public: + + mutex ( + ); + /*! + ensures + - #*this is properly initialized + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the mutex. + !*/ + + ~mutex ( + ); + /*! + requires + - *this is not locked + ensures + - all resources allocated by *this have been freed + !*/ + + void lock ( + ) const; + /*! + requires + - the thread calling lock() does not already have a lock on *this + ensures + - if (*this is currently locked by another thread) then + - the thread that called lock() on *this is put to sleep until + it becomes available + - if (*this is currently unlocked) then + - #*this becomes locked and the current thread is NOT put to sleep + but now "owns" #*this + !*/ + + void unlock ( + ) const; + /*! + requires + - the thread calling unlock() already has a lock on *this + ensures + - #*this is unlocked (i.e. other threads may now lock this object) + !*/ + + + private: + // restricted functions + mutex(mutex&); // copy constructor + mutex& operator=(mutex&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + // signaler object +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + class signaler + { + /*! + + WHAT THIS OBJECT REPRESENTS + This object represents an event signaling system for threads. It gives + a thread the ability to wake up other threads that are waiting for a + particular signal. + + Each signaler object is associated with one and only one mutex object. + More than one signaler object may be associated with a single mutex + but a signaler object may only be associated with a single mutex. + + NOTE: + You must guard against spurious wakeups. This means that a thread + might return from a call to wait even if no other thread called + signal. This is rare but must be guarded against. + !*/ + public: + + signaler ( + const mutex& associated_mutex + ); + /*! + ensures + - #*this is properly initialized + - #get_mutex() == associated_mutex + throws + - dlib::thread_error + the constructor may throw this exception if there is a problem + gathering resources to create the signaler. + !*/ + + + ~signaler ( + ); + /*! + ensures + - all resources allocated by *this have been freed + !*/ + + void wait ( + ) const; + /*! + requires + - get_mutex() is locked and owned by the calling thread + ensures + - atomically unlocks get_mutex() and blocks the calling thread + - calling thread may wake if another thread calls signal() or broadcast() + on *this + - when wait() returns the calling thread again has a lock on get_mutex() + !*/ + + bool wait_or_timeout ( + unsigned long milliseconds + ) const; + /*! + requires + - get_mutex() is locked and owned by the calling thread + ensures + - atomically unlocks get_mutex() and blocks the calling thread + - calling thread may wake if another thread calls signal() or broadcast() + on *this + - after the specified number of milliseconds has elapsed the calling thread + will wake once get_mutex() is free + - when wait returns the calling thread again has a lock on get_mutex() + + - returns false if the call to wait_or_timeout timed out + - returns true if the call did not time out + !*/ + + + void signal ( + ) const; + /*! + ensures + - if (at least one thread is waiting on *this) then + - at least one of the waiting threads will wake + !*/ + + void broadcast ( + ) const; + /*! + ensures + - any and all threads waiting on *this will wake + !*/ + + const mutex& get_mutex ( + ) const; + /*! + ensures + - returns a const reference to the mutex associated with *this + !*/ + + private: + // restricted functions + signaler(signaler&); // copy constructor + signaler& operator=(signaler&); // assignment operator + }; + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREADS_KERNEl_ABSTRACT_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_shared.cpp b/ml/dlib/dlib/threads/threads_kernel_shared.cpp new file mode 100644 index 000000000..8e81193e9 --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_shared.cpp @@ -0,0 +1,318 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEL_SHARED_CPp_ +#define DLIB_THREADS_KERNEL_SHARED_CPp_ + +#include "threads_kernel_shared.h" +#include "../assert.h" +#include "../platform.h" +#include <iostream> + + +#ifndef DLIB_THREAD_POOL_TIMEOUT +// default to 30000 milliseconds +#define DLIB_THREAD_POOL_TIMEOUT 30000 +#endif + +namespace dlib +{ + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- +// threader functions +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + namespace threads_kernel_shared + { + + bool thread_pool_has_been_destroyed = false; + +// ---------------------------------------------------------------------------------------- + + struct threader_destruct_helper + { + // cause the thread pool to begin its destruction process when + // global objects start to be destroyed + ~threader_destruct_helper() + { + thread_pool().destruct_if_ready(); + } + }; + +// ---------------------------------------------------------------------------------------- + + threader& thread_pool ( + ) + { + static threader* thread_pool = new threader; + static threader_destruct_helper a; + return *thread_pool; + } + +// ---------------------------------------------------------------------------------------- + + bool threader:: + is_dlib_thread ( + thread_id_type id + ) + { + auto_mutex M(data_mutex); + return thread_ids.is_member(id); + } + +// ---------------------------------------------------------------------------------------- + + threader:: + threader ( + ) : + total_count(0), + function_pointer(0), + pool_count(0), + data_ready(data_mutex), + data_empty(data_mutex), + destruct(false), + destructed(data_mutex), + do_not_ever_destruct(false) + { +#ifdef WIN32 + // Trying to destroy the global thread pool when we are part of a DLL and the + // DLL is being unloaded can sometimes lead to weird behavior. For example, in + // the python interpreter you will get the interpreter to hang. Or if we are + // part of a MATLAB mex file and the file is being unloaded there can also be + // similar weird issues. So when we are using dlib on windows we just disable + // the destruction of the global thread pool since it doesn't matter anyway. + // It's resources will just get freed by the OS. This is even the recommended + // thing to do by Microsoft (http://blogs.msdn.com/b/oldnewthing/archive/2012/01/05/10253268.aspx). + // + // As an aside, it's worth pointing out that the reason we try and free + // resources on program shutdown on other operating systems is so we can have + // clean reports from tools like valgrind which check for memory leaks. But + // trying to do this on windows is a lost cause so we give up in this case and + // follow the Microsoft recommendation. + do_not_ever_destruct = true; +#endif // WIN32 + } + +// ---------------------------------------------------------------------------------------- + + threader:: + ~threader ( + ) + { + data_mutex.lock(); + destruct = true; + data_ready.broadcast(); + + // wait for all the threads to end + while (total_count > 0) + destructed.wait(); + + thread_pool_has_been_destroyed = true; + data_mutex.unlock(); + } + +// ---------------------------------------------------------------------------------------- + + void threader:: + destruct_if_ready ( + ) + { + if (do_not_ever_destruct) + return; + + data_mutex.lock(); + + // if there aren't any active threads, just maybe some sitting around + // in the pool then just destroy the threader + if (total_count == pool_count) + { + destruct = true; + data_ready.broadcast(); + data_mutex.unlock(); + delete this; + } + else + { + // There are still some user threads running so there isn't + // much we can really do. Just let the program end without + // cleaning up threading resources. + data_mutex.unlock(); + } + } + +// ---------------------------------------------------------------------------------------- + + void threader:: + call_end_handlers ( + ) + { + reg.m.lock(); + const thread_id_type id = get_thread_id(); + thread_id_type id_copy; + member_function_pointer<> mfp; + + // Remove all the member function pointers for this thread from the tree + // and call them. + while (reg.reg[id] != 0) + { + reg.reg.remove(id,id_copy,mfp); + reg.m.unlock(); + mfp(); + reg.m.lock(); + } + reg.m.unlock(); + } + + // ------------------------------------------------------------------------------------ + + bool threader:: + create_new_thread ( + void (*funct)(void*), + void* param + ) + { + + // get a lock on the data mutex + auto_mutex M(data_mutex); + + // loop to ensure that the new function pointer is in the data + while (true) + { + // if the data is empty then add new data and quit loop + if (function_pointer == 0) + { + parameter = param; + function_pointer = funct; + break; + } + else + { + // wait for data to become empty + data_empty.wait(); + } + } + + + // get a thread for this new data + // if a new thread must be created + if (pool_count == 0) + { + // make thread and add it to the pool + if ( threads_kernel_shared_helpers::spawn_thread(thread_starter, this) == false ) + { + function_pointer = 0; + parameter = 0; + data_empty.signal(); + return false; + } + ++total_count; + } + // wake up a thread from the pool + else + { + data_ready.signal(); + } + + return true; + } + + // ------------------------------------------------------------------------------------ + + void thread_starter ( + void* object + ) + { + // get a reference to the calling threader object + threader& self = *static_cast<threader*>(object); + + + { + auto_mutex M(self.data_mutex); + + // add this thread id + thread_id_type thread_id = get_thread_id(); + self.thread_ids.add(thread_id); + + // indicate that this thread is now in the thread pool + ++self.pool_count; + + while (self.destruct == false) + { + // if data is ready then process it and launch the thread + // if its not ready then go back into the pool + while (self.function_pointer != 0) + { + // indicate that this thread is now out of the thread pool + --self.pool_count; + + // get the data for the function call + void (*funct)(void*) = self.function_pointer; + void* param = self.parameter; + self.function_pointer = 0; + + // signal that the data is now empty + self.data_empty.signal(); + + self.data_mutex.unlock(); + // Call funct with its intended parameter. If this function throws then + // we intentionally let the exception escape the thread and result in whatever + // happens when it gets caught by the OS (generally the program is terminated). + funct(param); + self.call_end_handlers(); + + self.data_mutex.lock(); + + // indicate that this thread is now back in the thread pool + ++self.pool_count; + } + + if (self.destruct == true) + break; + + // if we timed out and there isn't any work to do then + // this thread will quit this loop and end. + if (self.data_ready.wait_or_timeout(DLIB_THREAD_POOL_TIMEOUT) == false && + self.function_pointer == 0) + break; + + } + + // remove this thread id from thread_ids + thread_id = get_thread_id(); + self.thread_ids.destroy(thread_id); + + // indicate that this thread is now out of the thread pool + --self.pool_count; + --self.total_count; + + self.destructed.signal(); + + } // end of auto_mutex M(self.data_mutex) block + } + + // ------------------------------------------------------------------------------------ + + } + +// ---------------------------------------------------------------------------------------- + + bool is_dlib_thread ( + thread_id_type id + ) + { + return threads_kernel_shared::thread_pool().is_dlib_thread(id); + } + + bool is_dlib_thread ( + ) + { + return is_dlib_thread(get_thread_id()); + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_THREADS_KERNEL_SHARED_CPp_ + diff --git a/ml/dlib/dlib/threads/threads_kernel_shared.h b/ml/dlib/dlib/threads/threads_kernel_shared.h new file mode 100644 index 000000000..b4526e8db --- /dev/null +++ b/ml/dlib/dlib/threads/threads_kernel_shared.h @@ -0,0 +1,274 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEl_SHARED_ +#define DLIB_THREADS_KERNEl_SHARED_ + +// this file should be included at the bottom of one of the thread kernel headers for a +// specific platform. +//#include "../threads.h" +#include "auto_mutex_extension.h" +#include "../binary_search_tree.h" +#include "../member_function_pointer.h" +#include "../memory_manager.h" +#include "../queue.h" +#include "../set.h" +#include "../test_for_odr_violations.h" + + + + + +namespace dlib +{ + + +// ---------------------------------------------------------------------------------------- + + namespace threads_kernel_shared + { + void thread_starter ( + void* + ); + + class threader + { + /*! + INITIAL VALUE + - pool_count == 0 and + - data_ready is associated with the mutex data_mutex + - data_empty is associated with the mutex data_mutex + - destructed is associated with the mutex data_mutex + - destruct == false + - total_count == 0 + - function_pointer == 0 + - do_not_ever_destruct == false + + CONVENTION + - data_ready is associated with the mutex data_mutex + - data_empty is associated with the mutex data_mutex + - data_ready == a signaler used signal when there is new data waiting + to start a thread with. + - data_empty == a signaler used to signal when the data is now empty + - pool_count == the number of suspended threads in the thread pool + - total_count == the number of threads that are executing anywhere. i.e. + pool_count + the ones that are currently running some user function. + - if (function_pointer != 0) then + - parameter == a void pointer pointing to the parameter which + should be used to start the next thread + - function_pointer == a pointer to the next function to make a + new thread with + + - if (the destructor is running) then + - destruct == true + - else + - destruct == false + + - thread_ids is locked by the data_mutex + - thread_ids == a set that contains the thread id for each thread spawned by this + object. + !*/ + + + public: + threader ( + ); + + ~threader ( + ); + + void destruct_if_ready ( + ); + /*! + ensures + - if (there are no threads currently running and we haven't set do_not_ever_destruct) then + - calls delete this + - else + - does nothing + !*/ + + bool create_new_thread ( + void (*funct)(void*), + void* param + ); + + template < + typename T + > + void unregister_thread_end_handler ( + T& obj, + void (T::*handler)() + ) + { + member_function_pointer<> mfp, junk_mfp; + mfp.set(obj,handler); + + thread_id_type junk_id; + + // find any member function pointers in the registry that point to the same + // thing as mfp and remove them + auto_mutex M(reg.m); + reg.reg.reset(); + while (reg.reg.move_next()) + { + while (reg.reg.current_element_valid() && reg.reg.element().value() == mfp) + { + reg.reg.remove_current_element(junk_id, junk_mfp); + } + } + } + + template < + typename T + > + void register_thread_end_handler ( + T& obj, + void (T::*handler)() + ) + { + thread_id_type id = get_thread_id(); + member_function_pointer<> mfp; + mfp.set(obj,handler); + + auto_mutex M(reg.m); + reg.reg.add(id,mfp); + } + + bool is_dlib_thread ( + thread_id_type id + ); + + private: + + friend void thread_starter ( + void* + ); + + void call_end_handlers ( + ); + /*! + ensures + - calls the registered end handlers for the calling thread and + then removes them from reg.reg + !*/ + + + // private data + set<thread_id_type,memory_manager<char>::kernel_2b>::kernel_1b_c thread_ids; + unsigned long total_count; + void* parameter; + void (*function_pointer)(void*); + unsigned long pool_count; + mutex data_mutex; // mutex to protect the above data + signaler data_ready; // signaler to signal when there is new data + signaler data_empty; // signaler to signal when the data is empty + bool destruct; + signaler destructed; // signaler to signal when a thread has ended + bool do_not_ever_destruct; + + struct registry_type + { + mutex m; + binary_search_tree< + thread_id_type, + member_function_pointer<>, + memory_manager<char>::kernel_2a + >::kernel_2a_c reg; + }; + + // stuff for the register_thread_end_handler + registry_type reg; + + + // restricted functions + threader(threader&); // copy constructor + threader& operator=(threader&); // assignement opertor + + }; + + // ------------------------------------------------------------------------------------ + + threader& thread_pool ( + ); + /*! + ensures + - returns a reference to the global threader object + !*/ + + // ------------------------------------------------------------------------------------ + + extern bool thread_pool_has_been_destroyed; + } + + bool is_dlib_thread ( + thread_id_type id + ); + + bool is_dlib_thread ( + ); + +// ---------------------------------------------------------------------------------------- + + inline bool create_new_thread ( + void (*funct)(void*), + void* param + ) + { + try + { + // now make this thread + return threads_kernel_shared::thread_pool().create_new_thread(funct,param); + } + catch (std::bad_alloc&) + { + return false; + } + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + inline void register_thread_end_handler ( + T& obj, + void (T::*handler)() + ) + { + DLIB_ASSERT(is_dlib_thread(), + "\tvoid register_thread_end_handler" + << "\n\tYou can't register a thread end handler for a thread dlib didn't spawn." + ); + + threads_kernel_shared::thread_pool().register_thread_end_handler(obj,handler); + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + inline void unregister_thread_end_handler ( + T& obj, + void (T::*handler)() + ) + { + // Check if the thread pool has been destroyed and if it has then don't do anything. + // This bool here is always true except when the program has started to terminate and + // the thread pool object has been destroyed. This if is here to catch other global + // objects that have destructors that try to call unregister_thread_end_handler(). + // Without this check we get into trouble if the thread pool is destroyed before these + // objects. + if (threads_kernel_shared::thread_pool_has_been_destroyed == false) + threads_kernel_shared::thread_pool().unregister_thread_end_handler(obj,handler); + } + +// ---------------------------------------------------------------------------------------- + +} + +#ifdef NO_MAKEFILE +#include "threads_kernel_shared.cpp" +#endif + +#endif // DLIB_THREADS_KERNEl_SHARED_ + diff --git a/ml/dlib/dlib/threads/windows.h b/ml/dlib/dlib/threads/windows.h new file mode 100644 index 000000000..f7c775950 --- /dev/null +++ b/ml/dlib/dlib/threads/windows.h @@ -0,0 +1,6 @@ +// Copyright (C) 2003 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_THREADS_KERNEl_2_ +#include "threads_kernel_1.h" +#endif + |