summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/fiber/examples/work_sharing.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-27 18:24:20 +0000
commit483eb2f56657e8e7f419ab1a4fab8dce9ade8609 (patch)
treee5d88d25d870d5dedacb6bbdbe2a966086a0a5cf /src/boost/libs/fiber/examples/work_sharing.cpp
parentInitial commit. (diff)
downloadceph-upstream.tar.xz
ceph-upstream.zip
Adding upstream version 14.2.21.upstream/14.2.21upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/boost/libs/fiber/examples/work_sharing.cpp')
-rw-r--r--src/boost/libs/fiber/examples/work_sharing.cpp130
1 files changed, 130 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/examples/work_sharing.cpp b/src/boost/libs/fiber/examples/work_sharing.cpp
new file mode 100644
index 00000000..f5e31e51
--- /dev/null
+++ b/src/boost/libs/fiber/examples/work_sharing.cpp
@@ -0,0 +1,130 @@
+// Copyright Nat Goodspeed + Oliver Kowalke 2015.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <chrono>
+#include <condition_variable>
+#include <cstddef>
+#include <deque>
+#include <iomanip>
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <thread>
+
+#include <boost/assert.hpp>
+
+#include <boost/fiber/all.hpp>
+
+#include <boost/fiber/detail/thread_barrier.hpp>
+
+static std::size_t fiber_count{ 0 };
+static std::mutex mtx_count{};
+static boost::fibers::condition_variable_any cnd_count{};
+typedef std::unique_lock< std::mutex > lock_type;
+
+/*****************************************************************************
+* example fiber function
+*****************************************************************************/
+//[fiber_fn_ws
+void whatevah( char me) {
+ try {
+ std::thread::id my_thread = std::this_thread::get_id(); /*< get ID of initial thread >*/
+ {
+ std::ostringstream buffer;
+ buffer << "fiber " << me << " started on thread " << my_thread << '\n';
+ std::cout << buffer.str() << std::flush;
+ }
+ for ( unsigned i = 0; i < 10; ++i) { /*< loop ten times >*/
+ boost::this_fiber::yield(); /*< yield to other fibers >*/
+ std::thread::id new_thread = std::this_thread::get_id(); /*< get ID of current thread >*/
+ if ( new_thread != my_thread) { /*< test if fiber was migrated to another thread >*/
+ my_thread = new_thread;
+ std::ostringstream buffer;
+ buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
+ std::cout << buffer.str() << std::flush;
+ }
+ }
+ } catch ( ... ) {
+ }
+ lock_type lk( mtx_count);
+ if ( 0 == --fiber_count) { /*< Decrement fiber counter for each completed fiber. >*/
+ lk.unlock();
+ cnd_count.notify_all(); /*< Notify all fibers waiting on `cnd_count`. >*/
+ }
+}
+//]
+
+/*****************************************************************************
+* example thread function
+*****************************************************************************/
+//[thread_fn_ws
+void thread( boost::fibers::detail::thread_barrier * b) {
+ std::ostringstream buffer;
+ buffer << "thread started " << std::this_thread::get_id() << std::endl;
+ std::cout << buffer.str() << std::flush;
+ boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
+ Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
+ join the work sharing.
+ >*/
+ b->wait(); /*< sync with other threads: allow them to start processing >*/
+ lock_type lk( mtx_count);
+ cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
+ Suspend main fiber and resume worker fibers in the meanwhile.
+ Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
+ if all worker fibers are complete.
+ >*/
+ BOOST_ASSERT( 0 == fiber_count);
+}
+//]
+
+/*****************************************************************************
+* main()
+*****************************************************************************/
+int main( int argc, char *argv[]) {
+ std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
+//[main_ws
+ boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
+ Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
+ too, so each new fiber gets launched into the shared pool.
+ >*/
+
+ for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
+ Launch a number of worker fibers; each worker fiber picks up a character
+ that is passed as parameter to fiber-function `whatevah`.
+ Each worker fiber gets detached.
+ >*/
+ boost::fibers::fiber([c](){ whatevah( c); }).detach();
+ ++fiber_count; /*< Increment fiber counter for each new fiber. >*/
+ }
+ boost::fibers::detail::thread_barrier b( 4);
+ std::thread threads[] = { /*<
+ Launch a couple of threads that join the work sharing.
+ >*/
+ std::thread( thread, & b),
+ std::thread( thread, & b),
+ std::thread( thread, & b)
+ };
+ b.wait(); /*< sync with other threads: allow them to start processing >*/
+ {
+ lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
+ cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
+ Suspend main fiber and resume worker fibers in the meanwhile.
+ Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
+ if all worker fibers are complete.
+ >*/
+ } /*<
+ Releasing lock of mtx_count is required before joining the threads, otherwise
+ the other threads would be blocked inside condition_variable::wait() and
+ would never return (deadlock).
+ >*/
+ BOOST_ASSERT( 0 == fiber_count);
+ for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
+ t.join();
+ }
+//]
+ std::cout << "done." << std::endl;
+ return EXIT_SUCCESS;
+}