summaryrefslogtreecommitdiffstats
path: root/ml/dlib/dlib/pipe/pipe_kernel_1.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /ml/dlib/dlib/pipe/pipe_kernel_1.h
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'ml/dlib/dlib/pipe/pipe_kernel_1.h')
-rw-r--r--ml/dlib/dlib/pipe/pipe_kernel_1.h756
1 files changed, 756 insertions, 0 deletions
diff --git a/ml/dlib/dlib/pipe/pipe_kernel_1.h b/ml/dlib/dlib/pipe/pipe_kernel_1.h
new file mode 100644
index 000000000..543754121
--- /dev/null
+++ b/ml/dlib/dlib/pipe/pipe_kernel_1.h
@@ -0,0 +1,756 @@
+// Copyright (C) 2006 Davis E. King (davis@dlib.net)
+// License: Boost Software License See LICENSE.txt for the full license.
+#ifndef DLIB_PIPE_KERNEl_1_
+#define DLIB_PIPE_KERNEl_1_
+
+#include "../algs.h"
+#include "../threads.h"
+#include "pipe_kernel_abstract.h"
+
+namespace dlib
+{
+
+ template <
+ typename T
+ >
+ class pipe
+ {
+ /*!
+ INITIAL VALUE
+ - pipe_size == 0
+ - pipe_max_size == defined by constructor
+ - enabled == true
+ - data == a pointer to an array of ((pipe_max_size>0)?pipe_max_size:1) T objects.
+ - dequeue_waiters == 0
+ - enqueue_waiters == 0
+ - first == 1
+ - last == 1
+ - unblock_sig_waiters == 0
+
+ CONVENTION
+ - size() == pipe_size
+ - max_size() == pipe_max_size
+ - is_enabled() == enabled
+
+ - m == the mutex used to lock access to all the members of this class
+
+ - dequeue_waiters == the number of threads blocked on calls to dequeue()
+ - enqueue_waiters == the number of threads blocked on calls to enqueue() and
+ wait_until_empty()
+ - unblock_sig_waiters == the number of threads blocked on calls to
+ wait_for_num_blocked_dequeues() and the destructor. (i.e. the number of
+ blocking calls to unblock_sig.wait())
+
+ - dequeue_sig == the signaler that threads blocked on calls to dequeue() wait on
+ - enqueue_sig == the signaler that threads blocked on calls to enqueue()
+ or wait_until_empty() wait on.
+ - unblock_sig == the signaler that is signaled when a thread stops blocking on a call
+ to enqueue() or dequeue(). It is also signaled when a dequeue that will probably
+ block is called. The destructor and wait_for_num_blocked_dequeues are the only
+ things that will wait on this signaler.
+
+ - if (pipe_size > 0) then
+ - data[first] == the next item to dequeue
+ - data[last] == the item most recently added via enqueue, so the last to dequeue.
+ - else if (pipe_max_size == 0)
+ - if (first == 0 && last == 0) then
+ - data[0] == the next item to dequeue
+ - else if (first == 0 && last == 1) then
+ - data[0] has been taken out already by a dequeue
+ !*/
+
+ public:
+ // this is here for backwards compatibility with older versions of dlib.
+ typedef pipe kernel_1a;
+
+ typedef T type;
+
+ explicit pipe (
+ size_t maximum_size
+ );
+
+ virtual ~pipe (
+ );
+
+ void empty (
+ );
+
+ void wait_until_empty (
+ ) const;
+
+ void wait_for_num_blocked_dequeues (
+ unsigned long num
+ )const;
+
+ void enable (
+ );
+
+ void disable (
+ );
+
+ bool is_enqueue_enabled (
+ ) const;
+
+ void disable_enqueue (
+ );
+
+ void enable_enqueue (
+ );
+
+ bool is_dequeue_enabled (
+ ) const;
+
+ void disable_dequeue (
+ );
+
+ void enable_dequeue (
+ );
+
+ bool is_enabled (
+ ) const;
+
+ size_t max_size (
+ ) const;
+
+ size_t size (
+ ) const;
+
+ bool enqueue (
+ T& item
+ );
+
+ bool enqueue (
+ T&& item
+ ) { return enqueue(item); }
+
+ bool dequeue (
+ T& item
+ );
+
+ bool enqueue_or_timeout (
+ T& item,
+ unsigned long timeout
+ );
+
+ bool enqueue_or_timeout (
+ T&& item,
+ unsigned long timeout
+ ) { return enqueue_or_timeout(item,timeout); }
+
+ bool dequeue_or_timeout (
+ T& item,
+ unsigned long timeout
+ );
+
+ private:
+
+ size_t pipe_size;
+ const size_t pipe_max_size;
+ bool enabled;
+
+ T* const data;
+
+ size_t first;
+ size_t last;
+
+ mutex m;
+ signaler dequeue_sig;
+ signaler enqueue_sig;
+ signaler unblock_sig;
+
+ unsigned long dequeue_waiters;
+ mutable unsigned long enqueue_waiters;
+ mutable unsigned long unblock_sig_waiters;
+ bool enqueue_enabled;
+ bool dequeue_enabled;
+
+ // restricted functions
+ pipe(const pipe&); // copy constructor
+ pipe& operator=(const pipe&); // assignment operator
+
+ };
+
+// ----------------------------------------------------------------------------------------
+// ----------------------------------------------------------------------------------------
+// member function definitions
+// ----------------------------------------------------------------------------------------
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ pipe<T>::
+ pipe (
+ size_t maximum_size
+ ) :
+ pipe_size(0),
+ pipe_max_size(maximum_size),
+ enabled(true),
+ data(new T[(maximum_size>0) ? maximum_size : 1]),
+ first(1),
+ last(1),
+ dequeue_sig(m),
+ enqueue_sig(m),
+ unblock_sig(m),
+ dequeue_waiters(0),
+ enqueue_waiters(0),
+ unblock_sig_waiters(0),
+ enqueue_enabled(true),
+ dequeue_enabled(true)
+ {
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ pipe<T>::
+ ~pipe (
+ )
+ {
+ auto_mutex M(m);
+ ++unblock_sig_waiters;
+
+ // first make sure no one is blocked on any calls to enqueue() or dequeue()
+ enabled = false;
+ dequeue_sig.broadcast();
+ enqueue_sig.broadcast();
+ unblock_sig.broadcast();
+
+ // wait for all threads to unblock
+ while (dequeue_waiters > 0 || enqueue_waiters > 0 || unblock_sig_waiters > 1)
+ unblock_sig.wait();
+
+ delete [] data;
+ --unblock_sig_waiters;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ empty (
+ )
+ {
+ auto_mutex M(m);
+ pipe_size = 0;
+
+ // let any calls to enqueue() know that the pipe is now empty
+ if (enqueue_waiters > 0)
+ enqueue_sig.broadcast();
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ wait_until_empty (
+ ) const
+ {
+ auto_mutex M(m);
+ // this function is sort of like a call to enqueue so treat it like that
+ ++enqueue_waiters;
+
+ while (pipe_size > 0 && enabled && dequeue_enabled )
+ enqueue_sig.wait();
+
+ // let the destructor know we are ending if it is blocked waiting
+ if (enabled == false)
+ unblock_sig.broadcast();
+
+ --enqueue_waiters;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ enable (
+ )
+ {
+ auto_mutex M(m);
+ enabled = true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ disable (
+ )
+ {
+ auto_mutex M(m);
+ enabled = false;
+ dequeue_sig.broadcast();
+ enqueue_sig.broadcast();
+ unblock_sig.broadcast();
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ is_enabled (
+ ) const
+ {
+ auto_mutex M(m);
+ return enabled;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ size_t pipe<T>::
+ max_size (
+ ) const
+ {
+ auto_mutex M(m);
+ return pipe_max_size;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ size_t pipe<T>::
+ size (
+ ) const
+ {
+ auto_mutex M(m);
+ return pipe_size;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ enqueue (
+ T& item
+ )
+ {
+ auto_mutex M(m);
+ ++enqueue_waiters;
+
+ // wait until there is room or we are disabled
+ while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
+ !(pipe_max_size == 0 && first == 1) )
+ enqueue_sig.wait();
+
+ if (enabled == false || enqueue_enabled == false)
+ {
+ --enqueue_waiters;
+ // let the destructor know we are unblocking
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ // set the appropriate values for first and last
+ if (pipe_size == 0)
+ {
+ first = 0;
+ last = 0;
+ }
+ else
+ {
+ last = (last+1)%pipe_max_size;
+ }
+
+
+ exchange(item,data[last]);
+
+ // wake up a call to dequeue() if there are any currently blocked
+ if (dequeue_waiters > 0)
+ dequeue_sig.signal();
+
+ if (pipe_max_size > 0)
+ {
+ ++pipe_size;
+ }
+ else
+ {
+ // wait for a dequeue to take the item out
+ while (last == 0 && enabled && enqueue_enabled)
+ enqueue_sig.wait();
+
+ if (last == 0 && (enabled == false || enqueue_enabled == false))
+ {
+ last = 1;
+ first = 1;
+
+ // no one dequeued this object to put it back into item
+ exchange(item,data[0]);
+
+ --enqueue_waiters;
+ // let the destructor know we are unblocking
+ if (unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ last = 1;
+ first = 1;
+
+ // tell any waiting calls to enqueue() that one of them can proceed
+ if (enqueue_waiters > 1)
+ enqueue_sig.broadcast();
+
+ // let the destructor know we are unblocking
+ if (enabled == false && unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+ }
+
+ --enqueue_waiters;
+ return true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ dequeue (
+ T& item
+ )
+ {
+ auto_mutex M(m);
+ ++dequeue_waiters;
+
+ if (pipe_size == 0)
+ {
+ // notify wait_for_num_blocked_dequeues()
+ if (unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+
+ // notify any blocked enqueue_or_timeout() calls
+ if (enqueue_waiters > 0)
+ enqueue_sig.broadcast();
+ }
+
+ // wait until there is something in the pipe or we are disabled
+ while (pipe_size == 0 && enabled && dequeue_enabled &&
+ !(pipe_max_size == 0 && first == 0 && last == 0) )
+ dequeue_sig.wait();
+
+ if (enabled == false || dequeue_enabled == false)
+ {
+ --dequeue_waiters;
+ // let the destructor know we are unblocking
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ exchange(item,data[first]);
+
+ if (pipe_max_size > 0)
+ {
+ // set the appropriate values for first
+ first = (first+1)%pipe_max_size;
+
+ --pipe_size;
+ }
+ else
+ {
+ // let the enqueue waiting on us know that we took the
+ // item out already.
+ last = 1;
+ }
+
+ // wake up a call to enqueue() if there are any currently blocked
+ if (enqueue_waiters > 0)
+ enqueue_sig.broadcast();
+
+ --dequeue_waiters;
+ return true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ enqueue_or_timeout (
+ T& item,
+ unsigned long timeout
+ )
+ {
+ auto_mutex M(m);
+ ++enqueue_waiters;
+
+ // wait until there is room or we are disabled or
+ // we run out of time.
+ bool timed_out = false;
+ while (pipe_size == pipe_max_size && enabled && enqueue_enabled &&
+ !(pipe_max_size == 0 && dequeue_waiters > 0 && first == 1) )
+ {
+ if (timeout == 0 || enqueue_sig.wait_or_timeout(timeout) == false)
+ {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if (enabled == false || timed_out || enqueue_enabled == false)
+ {
+ --enqueue_waiters;
+ // let the destructor know we are unblocking
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ // set the appropriate values for first and last
+ if (pipe_size == 0)
+ {
+ first = 0;
+ last = 0;
+ }
+ else
+ {
+ last = (last+1)%pipe_max_size;
+ }
+
+
+ exchange(item,data[last]);
+
+ // wake up a call to dequeue() if there are any currently blocked
+ if (dequeue_waiters > 0)
+ dequeue_sig.signal();
+
+ if (pipe_max_size > 0)
+ {
+ ++pipe_size;
+ }
+ else
+ {
+ // wait for a dequeue to take the item out
+ while (last == 0 && enabled && enqueue_enabled)
+ enqueue_sig.wait();
+
+ if (last == 0 && (enabled == false || enqueue_enabled == false))
+ {
+ last = 1;
+ first = 1;
+
+ // no one dequeued this object to put it back into item
+ exchange(item,data[0]);
+
+ --enqueue_waiters;
+ // let the destructor know we are unblocking
+ if (unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ last = 1;
+ first = 1;
+
+ // tell any waiting calls to enqueue() that one of them can proceed
+ if (enqueue_waiters > 1)
+ enqueue_sig.broadcast();
+
+ // let the destructor know we are unblocking
+ if (enabled == false && unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+ }
+
+ --enqueue_waiters;
+ return true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ dequeue_or_timeout (
+ T& item,
+ unsigned long timeout
+ )
+ {
+ auto_mutex M(m);
+ ++dequeue_waiters;
+
+ if (pipe_size == 0)
+ {
+ // notify wait_for_num_blocked_dequeues()
+ if (unblock_sig_waiters > 0)
+ unblock_sig.broadcast();
+
+ // notify any blocked enqueue_or_timeout() calls
+ if (enqueue_waiters > 0)
+ enqueue_sig.broadcast();
+ }
+
+ bool timed_out = false;
+ // wait until there is something in the pipe or we are disabled or we timeout.
+ while (pipe_size == 0 && enabled && dequeue_enabled &&
+ !(pipe_max_size == 0 && first == 0 && last == 0) )
+ {
+ if (timeout == 0 || dequeue_sig.wait_or_timeout(timeout) == false)
+ {
+ timed_out = true;
+ break;
+ }
+ }
+
+ if (enabled == false || timed_out || dequeue_enabled == false)
+ {
+ --dequeue_waiters;
+ // let the destructor know we are unblocking
+ unblock_sig.broadcast();
+ return false;
+ }
+
+ exchange(item,data[first]);
+
+ if (pipe_max_size > 0)
+ {
+ // set the appropriate values for first
+ first = (first+1)%pipe_max_size;
+
+ --pipe_size;
+ }
+ else
+ {
+ // let the enqueue waiting on us know that we took the
+ // item out already.
+ last = 1;
+ }
+
+ // wake up a call to enqueue() if there are any currently blocked
+ if (enqueue_waiters > 0)
+ enqueue_sig.broadcast();
+
+ --dequeue_waiters;
+ return true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ wait_for_num_blocked_dequeues (
+ unsigned long num
+ )const
+ {
+ auto_mutex M(m);
+ ++unblock_sig_waiters;
+
+ while ( (dequeue_waiters < num || pipe_size != 0) && enabled && dequeue_enabled)
+ unblock_sig.wait();
+
+ // let the destructor know we are ending if it is blocked waiting
+ if (enabled == false)
+ unblock_sig.broadcast();
+
+ --unblock_sig_waiters;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ is_enqueue_enabled (
+ ) const
+ {
+ auto_mutex M(m);
+ return enqueue_enabled;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ disable_enqueue (
+ )
+ {
+ auto_mutex M(m);
+ enqueue_enabled = false;
+ enqueue_sig.broadcast();
+ }
+
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ enable_enqueue (
+ )
+ {
+ auto_mutex M(m);
+ enqueue_enabled = true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ bool pipe<T>::
+ is_dequeue_enabled (
+ ) const
+ {
+ auto_mutex M(m);
+ return dequeue_enabled;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ disable_dequeue (
+ )
+ {
+ auto_mutex M(m);
+ dequeue_enabled = false;
+ dequeue_sig.broadcast();
+ }
+
+
+// ----------------------------------------------------------------------------------------
+
+ template <
+ typename T
+ >
+ void pipe<T>::
+ enable_dequeue (
+ )
+ {
+ auto_mutex M(m);
+ dequeue_enabled = true;
+ }
+
+// ----------------------------------------------------------------------------------------
+
+}
+
+#endif // DLIB_PIPE_KERNEl_1_
+