diff options
Diffstat (limited to '')
-rw-r--r-- | ml/dlib/dlib/pipe/pipe_kernel_1.h | 756 |
1 files changed, 756 insertions, 0 deletions
diff --git a/ml/dlib/dlib/pipe/pipe_kernel_1.h b/ml/dlib/dlib/pipe/pipe_kernel_1.h new file mode 100644 index 000000000..543754121 --- /dev/null +++ b/ml/dlib/dlib/pipe/pipe_kernel_1.h @@ -0,0 +1,756 @@ +// Copyright (C) 2006 Davis E. King (davis@dlib.net) +// License: Boost Software License See LICENSE.txt for the full license. +#ifndef DLIB_PIPE_KERNEl_1_ +#define DLIB_PIPE_KERNEl_1_ + +#include "../algs.h" +#include "../threads.h" +#include "pipe_kernel_abstract.h" + +namespace dlib +{ + + template < + typename T + > + class pipe + { + /*! + INITIAL VALUE + - pipe_size == 0 + - pipe_max_size == defined by constructor + - enabled == true + - data == a pointer to an array of ((pipe_max_size>0)?pipe_max_size:1) T objects. + - dequeue_waiters == 0 + - enqueue_waiters == 0 + - first == 1 + - last == 1 + - unblock_sig_waiters == 0 + + CONVENTION + - size() == pipe_size + - max_size() == pipe_max_size + - is_enabled() == enabled + + - m == the mutex used to lock access to all the members of this class + + - dequeue_waiters == the number of threads blocked on calls to dequeue() + - enqueue_waiters == the number of threads blocked on calls to enqueue() and + wait_until_empty() + - unblock_sig_waiters == the number of threads blocked on calls to + wait_for_num_blocked_dequeues() and the destructor. (i.e. the number of + blocking calls to unblock_sig.wait()) + + - dequeue_sig == the signaler that threads blocked on calls to dequeue() wait on + - enqueue_sig == the signaler that threads blocked on calls to enqueue() + or wait_until_empty() wait on. + - unblock_sig == the signaler that is signaled when a thread stops blocking on a call + to enqueue() or dequeue(). It is also signaled when a dequeue that will probably + block is called. The destructor and wait_for_num_blocked_dequeues are the only + things that will wait on this signaler. + + - if (pipe_size > 0) then + - data[first] == the next item to dequeue + - data[last] == the item most recently added via enqueue, so the last to dequeue. + - else if (pipe_max_size == 0) + - if (first == 0 && last == 0) then + - data[0] == the next item to dequeue + - else if (first == 0 && last == 1) then + - data[0] has been taken out already by a dequeue + !*/ + + public: + // this is here for backwards compatibility with older versions of dlib. + typedef pipe kernel_1a; + + typedef T type; + + explicit pipe ( + size_t maximum_size + ); + + virtual ~pipe ( + ); + + void empty ( + ); + + void wait_until_empty ( + ) const; + + void wait_for_num_blocked_dequeues ( + unsigned long num + )const; + + void enable ( + ); + + void disable ( + ); + + bool is_enqueue_enabled ( + ) const; + + void disable_enqueue ( + ); + + void enable_enqueue ( + ); + + bool is_dequeue_enabled ( + ) const; + + void disable_dequeue ( + ); + + void enable_dequeue ( + ); + + bool is_enabled ( + ) const; + + size_t max_size ( + ) const; + + size_t size ( + ) const; + + bool enqueue ( + T& item + ); + + bool enqueue ( + T&& item + ) { return enqueue(item); } + + bool dequeue ( + T& item + ); + + bool enqueue_or_timeout ( + T& item, + unsigned long timeout + ); + + bool enqueue_or_timeout ( + T&& item, + unsigned long timeout + ) { return enqueue_or_timeout(item,timeout); } + + bool dequeue_or_timeout ( + T& item, + unsigned long timeout + ); + + private: + + size_t pipe_size; + const size_t pipe_max_size; + bool enabled; + + T* const data; + + size_t first; + size_t last; + + mutex m; + signaler dequeue_sig; + signaler enqueue_sig; + signaler unblock_sig; + + unsigned long dequeue_waiters; + mutable unsigned long enqueue_waiters; + mutable unsigned long unblock_sig_waiters; + bool enqueue_enabled; + bool dequeue_enabled; + + // restricted functions + pipe(const pipe&); // copy constructor + pipe& operator=(const pipe&); // assignment operator + + }; + +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- +// member function definitions +// ---------------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + pipe<T>:: + pipe ( + size_t maximum_size + ) : + pipe_size(0), + pipe_max_size(maximum_size), + enabled(true), + data(new T[(maximum_size>0) ? maximum_size : 1]), + first(1), + last(1), + dequeue_sig(m), + enqueue_sig(m), + unblock_sig(m), + dequeue_waiters(0), + enqueue_waiters(0), + unblock_sig_waiters(0), + enqueue_enabled(true), + dequeue_enabled(true) + { + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + pipe<T>:: + ~pipe ( + ) + { + auto_mutex M(m); + ++unblock_sig_waiters; + + // first make sure no one is blocked on any calls to enqueue() or dequeue() + enabled = false; + dequeue_sig.broadcast(); + enqueue_sig.broadcast(); + unblock_sig.broadcast(); + + // wait for all threads to unblock + while (dequeue_waiters > 0 || enqueue_waiters > 0 || unblock_sig_waiters > 1) + unblock_sig.wait(); + + delete [] data; + --unblock_sig_waiters; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + empty ( + ) + { + auto_mutex M(m); + pipe_size = 0; + + // let any calls to enqueue() know that the pipe is now empty + if (enqueue_waiters > 0) + enqueue_sig.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + wait_until_empty ( + ) const + { + auto_mutex M(m); + // this function is sort of like a call to enqueue so treat it like that + ++enqueue_waiters; + + while (pipe_size > 0 && enabled && dequeue_enabled ) + enqueue_sig.wait(); + + // let the destructor know we are ending if it is blocked waiting + if (enabled == false) + unblock_sig.broadcast(); + + --enqueue_waiters; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + enable ( + ) + { + auto_mutex M(m); + enabled = true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + disable ( + ) + { + auto_mutex M(m); + enabled = false; + dequeue_sig.broadcast(); + enqueue_sig.broadcast(); + unblock_sig.broadcast(); + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + is_enabled ( + ) const + { + auto_mutex M(m); + return enabled; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + size_t pipe<T>:: + max_size ( + ) const + { + auto_mutex M(m); + return pipe_max_size; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + size_t pipe<T>:: + size ( + ) const + { + auto_mutex M(m); + return pipe_size; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + enqueue ( + T& item + ) + { + auto_mutex M(m); + ++enqueue_waiters; + + // wait until there is room or we are disabled + while (pipe_size == pipe_max_size && enabled && enqueue_enabled && + !(pipe_max_size == 0 && first == 1) ) + enqueue_sig.wait(); + + if (enabled == false || enqueue_enabled == false) + { + --enqueue_waiters; + // let the destructor know we are unblocking + unblock_sig.broadcast(); + return false; + } + + // set the appropriate values for first and last + if (pipe_size == 0) + { + first = 0; + last = 0; + } + else + { + last = (last+1)%pipe_max_size; + } + + + exchange(item,data[last]); + + // wake up a call to dequeue() if there are any currently blocked + if (dequeue_waiters > 0) + dequeue_sig.signal(); + + if (pipe_max_size > 0) + { + ++pipe_size; + } + else + { + // wait for a dequeue to take the item out + while (last == 0 && enabled && enqueue_enabled) + enqueue_sig.wait(); + + if (last == 0 && (enabled == false || enqueue_enabled == false)) + { + last = 1; + first = 1; + + // no one dequeued this object to put it back into item + exchange(item,data[0]); + + --enqueue_waiters; + // let the destructor know we are unblocking + if (unblock_sig_waiters > 0) + unblock_sig.broadcast(); + return false; + } + + last = 1; + first = 1; + + // tell any waiting calls to enqueue() that one of them can proceed + if (enqueue_waiters > 1) + enqueue_sig.broadcast(); + + // let the destructor know we are unblocking + if (enabled == false && unblock_sig_waiters > 0) + unblock_sig.broadcast(); + } + + --enqueue_waiters; + return true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + dequeue ( + T& item + ) + { + auto_mutex M(m); + ++dequeue_waiters; + + if (pipe_size == 0) + { + // notify wait_for_num_blocked_dequeues() + if (unblock_sig_waiters > 0) + unblock_sig.broadcast(); + + // notify any blocked enqueue_or_timeout() calls + if (enqueue_waiters > 0) + enqueue_sig.broadcast(); + } + + // wait until there is something in the pipe or we are disabled + while (pipe_size == 0 && enabled && dequeue_enabled && + !(pipe_max_size == 0 && first == 0 && last == 0) ) + dequeue_sig.wait(); + + if (enabled == false || dequeue_enabled == false) + { + --dequeue_waiters; + // let the destructor know we are unblocking + unblock_sig.broadcast(); + return false; + } + + exchange(item,data[first]); + + if (pipe_max_size > 0) + { + // set the appropriate values for first + first = (first+1)%pipe_max_size; + + --pipe_size; + } + else + { + // let the enqueue waiting on us know that we took the + // item out already. + last = 1; + } + + // wake up a call to enqueue() if there are any currently blocked + if (enqueue_waiters > 0) + enqueue_sig.broadcast(); + + --dequeue_waiters; + return true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + enqueue_or_timeout ( + T& item, + unsigned long timeout + ) + { + auto_mutex M(m); + ++enqueue_waiters; + + // wait until there is room or we are disabled or + // we run out of time. + bool timed_out = false; + while (pipe_size == pipe_max_size && enabled && enqueue_enabled && + !(pipe_max_size == 0 && dequeue_waiters > 0 && first == 1) ) + { + if (timeout == 0 || enqueue_sig.wait_or_timeout(timeout) == false) + { + timed_out = true; + break; + } + } + + if (enabled == false || timed_out || enqueue_enabled == false) + { + --enqueue_waiters; + // let the destructor know we are unblocking + unblock_sig.broadcast(); + return false; + } + + // set the appropriate values for first and last + if (pipe_size == 0) + { + first = 0; + last = 0; + } + else + { + last = (last+1)%pipe_max_size; + } + + + exchange(item,data[last]); + + // wake up a call to dequeue() if there are any currently blocked + if (dequeue_waiters > 0) + dequeue_sig.signal(); + + if (pipe_max_size > 0) + { + ++pipe_size; + } + else + { + // wait for a dequeue to take the item out + while (last == 0 && enabled && enqueue_enabled) + enqueue_sig.wait(); + + if (last == 0 && (enabled == false || enqueue_enabled == false)) + { + last = 1; + first = 1; + + // no one dequeued this object to put it back into item + exchange(item,data[0]); + + --enqueue_waiters; + // let the destructor know we are unblocking + if (unblock_sig_waiters > 0) + unblock_sig.broadcast(); + return false; + } + + last = 1; + first = 1; + + // tell any waiting calls to enqueue() that one of them can proceed + if (enqueue_waiters > 1) + enqueue_sig.broadcast(); + + // let the destructor know we are unblocking + if (enabled == false && unblock_sig_waiters > 0) + unblock_sig.broadcast(); + } + + --enqueue_waiters; + return true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + dequeue_or_timeout ( + T& item, + unsigned long timeout + ) + { + auto_mutex M(m); + ++dequeue_waiters; + + if (pipe_size == 0) + { + // notify wait_for_num_blocked_dequeues() + if (unblock_sig_waiters > 0) + unblock_sig.broadcast(); + + // notify any blocked enqueue_or_timeout() calls + if (enqueue_waiters > 0) + enqueue_sig.broadcast(); + } + + bool timed_out = false; + // wait until there is something in the pipe or we are disabled or we timeout. + while (pipe_size == 0 && enabled && dequeue_enabled && + !(pipe_max_size == 0 && first == 0 && last == 0) ) + { + if (timeout == 0 || dequeue_sig.wait_or_timeout(timeout) == false) + { + timed_out = true; + break; + } + } + + if (enabled == false || timed_out || dequeue_enabled == false) + { + --dequeue_waiters; + // let the destructor know we are unblocking + unblock_sig.broadcast(); + return false; + } + + exchange(item,data[first]); + + if (pipe_max_size > 0) + { + // set the appropriate values for first + first = (first+1)%pipe_max_size; + + --pipe_size; + } + else + { + // let the enqueue waiting on us know that we took the + // item out already. + last = 1; + } + + // wake up a call to enqueue() if there are any currently blocked + if (enqueue_waiters > 0) + enqueue_sig.broadcast(); + + --dequeue_waiters; + return true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + wait_for_num_blocked_dequeues ( + unsigned long num + )const + { + auto_mutex M(m); + ++unblock_sig_waiters; + + while ( (dequeue_waiters < num || pipe_size != 0) && enabled && dequeue_enabled) + unblock_sig.wait(); + + // let the destructor know we are ending if it is blocked waiting + if (enabled == false) + unblock_sig.broadcast(); + + --unblock_sig_waiters; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + is_enqueue_enabled ( + ) const + { + auto_mutex M(m); + return enqueue_enabled; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + disable_enqueue ( + ) + { + auto_mutex M(m); + enqueue_enabled = false; + enqueue_sig.broadcast(); + } + + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + enable_enqueue ( + ) + { + auto_mutex M(m); + enqueue_enabled = true; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + bool pipe<T>:: + is_dequeue_enabled ( + ) const + { + auto_mutex M(m); + return dequeue_enabled; + } + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + disable_dequeue ( + ) + { + auto_mutex M(m); + dequeue_enabled = false; + dequeue_sig.broadcast(); + } + + +// ---------------------------------------------------------------------------------------- + + template < + typename T + > + void pipe<T>:: + enable_dequeue ( + ) + { + auto_mutex M(m); + dequeue_enabled = true; + } + +// ---------------------------------------------------------------------------------------- + +} + +#endif // DLIB_PIPE_KERNEl_1_ + |