diff options
Diffstat (limited to 'src/boost/libs/fiber/performance')
17 files changed, 1507 insertions, 0 deletions
diff --git a/src/boost/libs/fiber/performance/clock.hpp b/src/boost/libs/fiber/performance/clock.hpp new file mode 100644 index 00000000..00a1bb18 --- /dev/null +++ b/src/boost/libs/fiber/performance/clock.hpp @@ -0,0 +1,44 @@ + +// Copyright Oliver Kowalke 2009. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#ifndef CLOCK_H +#define CLOCK_H + +#include <algorithm> +#include <chrono> +#include <cstddef> +#include <cstdint> +#include <numeric> +#include <vector> + +#include <boost/assert.hpp> + +typedef std::chrono::steady_clock clock_type; +typedef clock_type::duration duration_type; +typedef clock_type::time_point time_point_type; + +struct clock_overhead +{ + std::uint64_t operator()() + { + time_point_type start( clock_type::now() ); + return ( clock_type::now() - start).count(); + } +}; + +duration_type overhead_clock() +{ + std::size_t iterations( 10); + std::vector< std::uint64_t > overhead( iterations, 0); + for ( std::size_t i = 0; i < iterations; ++i) + std::generate( + overhead.begin(), overhead.end(), + clock_overhead() ); + BOOST_ASSERT( overhead.begin() != overhead.end() ); + return duration_type( std::accumulate( overhead.begin(), overhead.end(), 0) / iterations); +} + +#endif // CLOCK_H diff --git a/src/boost/libs/fiber/performance/fiber/Jamfile.v2 b/src/boost/libs/fiber/performance/fiber/Jamfile.v2 new file mode 100644 index 00000000..7de178db --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/Jamfile.v2 @@ -0,0 +1,52 @@ + +# Copyright Oliver Kowalke 2009. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +# For more information, see http://www.boost.org/ + +import common ; +import feature ; +import indirect ; +import modules ; +import os ; +import toolset ; + +project boost/fiber/performance/fiber + : requirements + <library>/boost/fiber//boost_fiber + <library>/boost/fiber//boost_fiber_numa + <target-os>solaris:<linkflags>"-llgrp" + <target-os>windows:<define>_WIN32_WINNT=0x0601 + <toolset>gcc,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>gcc,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <toolset>clang,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>clang,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <link>static + <numa>on + <threading>multi + <optimization>speed + <variant>release + ; + +exe skynet_join : + skynet_join.cpp ; + +exe skynet_detach : + skynet_detach.cpp ; + +exe skynet_shared_join : + skynet_shared_join.cpp ; + +exe skynet_shared_detach : + skynet_shared_detach.cpp ; + +exe skynet_stealing_join : + skynet_stealing_join.cpp ; + +exe skynet_stealing_detach : + skynet_stealing_detach.cpp ; + +exe skynet_stealing_async : + skynet_stealing_async.cpp ; diff --git a/src/boost/libs/fiber/performance/fiber/barrier.hpp b/src/boost/libs/fiber/performance/fiber/barrier.hpp new file mode 100644 index 00000000..dcbd7e98 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/barrier.hpp @@ -0,0 +1,50 @@ + +// Copyright Oliver Kowalke 2013. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +#ifndef BARRIER_H +#define BARRIER_H + +#include <cstddef> +#include <condition_variable> +#include <mutex> + +#include <boost/assert.hpp> + +class barrier { +private: + std::size_t initial_; + std::size_t current_; + bool cycle_{ true }; + std::mutex mtx_{}; + std::condition_variable cond_{}; + +public: + explicit barrier( std::size_t initial) : + initial_{ initial }, + current_{ initial_ } { + BOOST_ASSERT ( 0 != initial); + } + + barrier( barrier const&) = delete; + barrier & operator=( barrier const&) = delete; + + bool wait() { + std::unique_lock< std::mutex > lk( mtx_); + const bool cycle = cycle_; + if ( 0 == --current_) { + cycle_ = ! cycle_; + current_ = initial_; + lk.unlock(); // no pessimization + cond_.notify_all(); + return true; + } else { + cond_.wait( lk, [&](){ return cycle != cycle_; }); + } + return false; + } +}; + +#endif // BARRIER_H diff --git a/src/boost/libs/fiber/performance/fiber/numa/Jamfile.v2 b/src/boost/libs/fiber/performance/fiber/numa/Jamfile.v2 new file mode 100644 index 00000000..30f3a7f9 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/numa/Jamfile.v2 @@ -0,0 +1,32 @@ + +# Copyright Oliver Kowalke 2009. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +# For more information, see http://www.boost.org/ + +import common ; +import feature ; +import indirect ; +import modules ; +import os ; +import toolset ; + +project boost/fiber/performance/fiber/numa + : requirements + <library>/boost/fiber//boost_fiber + <target-os>solaris:<linkflags>"-llgrp" + <target-os>windows:<define>_WIN32_WINNT=0x0601 + <toolset>gcc,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>gcc,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <toolset>clang,<segmented-stacks>on:<cxxflags>-fsplit-stack + <toolset>clang,<segmented-stacks>on:<cxxflags>-DBOOST_USE_SEGMENTED_STACKS + <link>static + <threading>multi + <optimization>speed + <variant>release + ; + +exe skynet_stealing_detach : + skynet_stealing_detach.cpp ; diff --git a/src/boost/libs/fiber/performance/fiber/numa/skynet_stealing_detach.cpp b/src/boost/libs/fiber/performance/fiber/numa/skynet_stealing_detach.cpp new file mode 100644 index 00000000..5d41b9c7 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/numa/skynet_stealing_detach.cpp @@ -0,0 +1,121 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <queue> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <random> +#include <sstream> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/numa/all.hpp> +#include <boost/predef.h> + +#include "../barrier.hpp" + +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using allocator_type = boost::fibers::fixedsize_stack; +using lock_type = std::unique_lock< std::mutex >; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +std::uint32_t hardware_concurrency( std::vector< boost::fibers::numa::node > const& topo) { + std::uint32_t cpus = 0; + for ( auto & node : topo) { + cpus += node.logical_cpus.size(); + } + return cpus; +} + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + boost::fibers::fiber{ boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div }.detach(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +void thread( std::uint32_t cpu_id, std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) { + boost::fibers::use_scheduling_algorithm< boost::fibers::numa::algo::work_stealing >( cpu_id, node_id, topo); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + std::vector< boost::fibers::numa::node > topo = boost::fibers::numa::topology(); + auto node = topo[0]; + auto main_cpu_id = * node.logical_cpus.begin(); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + std::vector< std::thread > threads; + for ( auto && node : topo) { + for ( std::uint32_t cpu_id : node.logical_cpus) { + // exclude main-thread + if ( main_cpu_id != cpu_id) { + threads.emplace_back( thread, cpu_id, node.id, std::cref( topo) ); + } + } + } + boost::fibers::use_scheduling_algorithm< boost::fibers::numa::algo::work_stealing >( main_cpu_id, node.id, topo); + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_detach.cpp b/src/boost/libs/fiber/performance/fiber/skynet_detach.cpp new file mode 100644 index 00000000..5e6d4052 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_detach.cpp @@ -0,0 +1,80 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <numeric> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/predef.h> + +using allocator_type = boost::fibers::fixedsize_stack; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + boost::fibers::fiber{ boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div }.detach(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +int main() { + try { + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + // Windows 10 and FreeBSD require a fiber stack of 8kb + // otherwise the stack gets exhausted + // stack requirements must be checked for other OS too +#if BOOST_OS_WINDOWS || BOOST_OS_BSD + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; +#else + allocator_type salloc{ allocator_type::traits_type::page_size() }; +#endif + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_join.cpp b/src/boost/libs/fiber/performance/fiber/skynet_join.cpp new file mode 100644 index 00000000..543ee03f --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_join.cpp @@ -0,0 +1,84 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <numeric> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/predef.h> + +using allocator_type = boost::fibers::fixedsize_stack; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + std::vector< boost::fibers::fiber > fibers; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + fibers.emplace_back( boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div); + } + for ( auto & f: fibers) { + f.join(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +int main() { + try { + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + // Windows 10 and FreeBSD require a fiber stack of 8kb + // otherwise the stack gets exhausted + // stack requirements must be checked for other OS too +#if BOOST_OS_WINDOWS || BOOST_OS_BSD + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; +#else + allocator_type salloc{ allocator_type::traits_type::page_size() }; +#endif + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_shared_detach.cpp b/src/boost/libs/fiber/performance/fiber/skynet_shared_detach.cpp new file mode 100644 index 00000000..12fd5653 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_shared_detach.cpp @@ -0,0 +1,109 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <deque> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/numa/pin_thread.hpp> +#include <boost/predef.h> + +#include "barrier.hpp" + +using allocator_type = boost::fibers::fixedsize_stack; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using lock_type = std::unique_lock< std::mutex >; +using time_point_type = clock_type::time_point; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + boost::fibers::fiber{ boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div }.detach(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +void thread( unsigned int idx, barrier * b) { + boost::fibers::numa::pin_thread( idx); + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); + b->wait(); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); + unsigned int n = std::thread::hardware_concurrency(); + barrier b( n); + boost::fibers::numa::pin_thread( n - 1); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + std::vector< std::thread > threads; + for ( unsigned int i = 1; i < n; ++i) { + threads.emplace_back( thread, i - 1, & b); + }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + b.wait(); + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_shared_join.cpp b/src/boost/libs/fiber/performance/fiber/skynet_shared_join.cpp new file mode 100644 index 00000000..c26530f2 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_shared_join.cpp @@ -0,0 +1,113 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <deque> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/fiber/numa/pin_thread.hpp> +#include <boost/predef.h> + +#include "barrier.hpp" + +using allocator_type = boost::fibers::fixedsize_stack; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using lock_type = std::unique_lock< std::mutex >; +using time_point_type = clock_type::time_point; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + std::vector< boost::fibers::fiber > fibers; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + fibers.emplace_back( boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div); + } + for ( auto & f: fibers) { + f.join(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +void thread( unsigned int idx, barrier * b) { + boost::fibers::numa::pin_thread( idx); + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); + b->wait(); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); + unsigned int n = std::thread::hardware_concurrency(); + barrier b( n); + boost::fibers::numa::pin_thread( n - 1); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + std::vector< std::thread > threads; + for ( unsigned int i = 1; i < n; ++i) { + threads.emplace_back( thread, i - 1, & b); + }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + b.wait(); + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_stealing_async.cpp b/src/boost/libs/fiber/performance/fiber/skynet_stealing_async.cpp new file mode 100644 index 00000000..3701253b --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_stealing_async.cpp @@ -0,0 +1,114 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <queue> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <random> +#include <sstream> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/predef.h> + +#include "barrier.hpp" + +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using allocator_type = boost::fibers::fixedsize_stack; +using lock_type = std::unique_lock< std::mutex >; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +// microbenchmark +std::uint64_t skynet(allocator_type& salloc, std::uint64_t num, std::uint64_t size, std::uint64_t div) { + if ( size != 1){ + size /= div; + + std::vector<boost::fibers::future<std::uint64_t> > results; + results.reserve( div); + + for ( std::uint64_t i = 0; i != div; ++i) { + std::uint64_t sub_num = num + i * size; + results.emplace_back(boost::fibers::async( + boost::fibers::launch::dispatch + , std::allocator_arg, salloc + , skynet + , std::ref( salloc), sub_num, size, div)); + } + + std::uint64_t sum = 0; + for ( auto& f : results) + sum += f.get(); + + return sum; + } + + return num; +} + +void thread( std::uint32_t thread_count) { + // thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + // count of logical ids + std::uint32_t thread_count = std::thread::hardware_concurrency(); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + std::vector< std::thread > threads; + for ( std::uint32_t i = 1 /* count main-thread */; i < thread_count; ++i) { + // spawn thread + threads.emplace_back( thread, thread_count); + } + // main-thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + time_point_type start{ clock_type::now() }; + result = skynet( salloc, 0, size, div); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_stealing_detach.cpp b/src/boost/libs/fiber/performance/fiber/skynet_stealing_detach.cpp new file mode 100644 index 00000000..fbc1b205 --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_stealing_detach.cpp @@ -0,0 +1,109 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <queue> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <random> +#include <sstream> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/predef.h> + +#include "barrier.hpp" + +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using allocator_type = boost::fibers::fixedsize_stack; +using lock_type = std::unique_lock< std::mutex >; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + boost::fibers::fiber{ boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div }.detach(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +void thread( std::uint32_t thread_count) { + // thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + // count of logical cpus + std::uint32_t thread_count = std::thread::hardware_concurrency(); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + std::vector< std::thread > threads; + for ( std::uint32_t i = 1 /* count main-thread */; i < thread_count; ++i) { + // spawn thread + threads.emplace_back( thread, thread_count); + } + // main-thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/fiber/skynet_stealing_join.cpp b/src/boost/libs/fiber/performance/fiber/skynet_stealing_join.cpp new file mode 100644 index 00000000..5acb1f7d --- /dev/null +++ b/src/boost/libs/fiber/performance/fiber/skynet_stealing_join.cpp @@ -0,0 +1,113 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cmath> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <queue> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <random> +#include <sstream> +#include <vector> + +#include <boost/fiber/all.hpp> +#include <boost/predef.h> + +#include "barrier.hpp" + +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; +using channel_type = boost::fibers::buffered_channel< std::uint64_t >; +using allocator_type = boost::fibers::fixedsize_stack; +using lock_type = std::unique_lock< std::mutex >; + +static bool done = false; +static std::mutex mtx{}; +static boost::fibers::condition_variable_any cnd{}; + +// microbenchmark +void skynet( allocator_type & salloc, channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + std::vector< boost::fibers::fiber > fibers; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + fibers.emplace_back( boost::fibers::launch::dispatch, + std::allocator_arg, salloc, + skynet, + std::ref( salloc), std::ref( rc), sub_num, size / div, div); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + for ( auto & f : fibers) { + f.join(); + } + } +} + +void thread( std::uint32_t thread_count) { + // thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + lock_type lk( mtx); + cnd.wait( lk, [](){ return done; }); + BOOST_ASSERT( done); +} + +int main() { + try { + // count of logical cpus + std::uint32_t thread_count = std::thread::hardware_concurrency(); + std::size_t size{ 1000000 }; + std::size_t div{ 10 }; + allocator_type salloc{ 2*allocator_type::traits_type::page_size() }; + std::uint64_t result{ 0 }; + channel_type rc{ 2 }; + std::vector< std::thread > threads; + for ( std::uint32_t i = 1 /* count main-thread */; i < thread_count; ++i) { + // spawn thread + threads.emplace_back( thread, thread_count); + } + // main-thread registers itself at work-stealing scheduler + boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( thread_count); + time_point_type start{ clock_type::now() }; + skynet( salloc, rc, 0, size, div); + result = rc.value_pop(); + if ( 499999500000 != result) { + throw std::runtime_error("invalid result"); + } + auto duration = clock_type::now() - start; + lock_type lk( mtx); + done = true; + lk.unlock(); + cnd.notify_all(); + for ( std::thread & t : threads) { + t.join(); + } + std::cout << "duration: " << duration.count() / 1000000 << " ms" << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/thread/Jamfile.v2 b/src/boost/libs/fiber/performance/thread/Jamfile.v2 new file mode 100644 index 00000000..baa18210 --- /dev/null +++ b/src/boost/libs/fiber/performance/thread/Jamfile.v2 @@ -0,0 +1,34 @@ + +# Copyright Oliver Kowalke 2009. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +# For more information, see http://www.boost.org/ + +import common ; +import feature ; +import indirect ; +import modules ; +import os ; +import toolset ; + +project boost/fiber/performance/thread + : requirements + <link>static + <threading>multi + <optimization>speed + <variant>release + ; + +exe skynet_async + : skynet_async.cpp + ; + +exe skynet_std + : skynet_std.cpp + ; + +exe skynet_pthread + : skynet_pthread.cpp + ; diff --git a/src/boost/libs/fiber/performance/thread/buffered_channel.hpp b/src/boost/libs/fiber/performance/thread/buffered_channel.hpp new file mode 100644 index 00000000..7f021dfc --- /dev/null +++ b/src/boost/libs/fiber/performance/thread/buffered_channel.hpp @@ -0,0 +1,228 @@ + +// Copyright Oliver Kowalke 2016. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +// based on Dmitry Vyukov's MPMC queue +// (http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue) + +#ifndef BUFFERED_CHANNEL_H +#define BUFFERED_CHANNEL_H + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <memory> +#include <mutex> +#include <stdexcept> +#include <type_traits> + +#include <boost/assert.hpp> +#include <boost/config.hpp> +#include <boost/fiber/detail/config.hpp> + +enum class channel_op_status { + success = 0, + empty, + full, + closed, + timeout +}; + +template< typename T > +class buffered_channel { +public: + typedef T value_type; + +private: + typedef typename std::aligned_storage< sizeof( T), alignof( T) >::type storage_type; + + struct alignas(cache_alignment) slot { + std::atomic< std::size_t > cycle{ 0 }; + storage_type storage{}; + + slot() = default; + }; + + // procuder cacheline + alignas(cache_alignment) std::atomic< std::size_t > producer_idx_{ 0 }; + // consumer cacheline + alignas(cache_alignment) std::atomic< std::size_t > consumer_idx_{ 0 }; + // shared write cacheline + alignas(cache_alignment) std::atomic_bool closed_{ false }; + mutable std::mutex mtx_{}; + std::condition_variable not_full_cnd_{}; + std::condition_variable not_empty_cnd_{}; + // shared read cacheline + alignas(cache_alignment) slot * slots_{ nullptr }; + std::size_t capacity_; + char pad_[cacheline_length]; + std::size_t waiting_consumer_{ 0 }; + + bool is_full_() { + std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; + return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx); + } + + bool is_empty_() { + std::size_t idx{ consumer_idx_.load( std::memory_order_relaxed) }; + return 0 > static_cast< std::intptr_t >( slots_[idx & (capacity_ - 1)].cycle.load( std::memory_order_acquire) ) - static_cast< std::intptr_t >( idx + 1); + } + + template< typename ValueType > + channel_op_status try_push_( ValueType && value) { + slot * s{ nullptr }; + std::size_t idx{ producer_idx_.load( std::memory_order_relaxed) }; + for (;;) { + s = & slots_[idx & (capacity_ - 1)]; + std::size_t cycle{ s->cycle.load( std::memory_order_acquire) }; + std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx) }; + if ( 0 == diff) { + if ( producer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { + break; + } + } else if ( 0 > diff) { + return channel_op_status::full; + } else { + idx = producer_idx_.load( std::memory_order_relaxed); + } + } + ::new ( static_cast< void * >( std::addressof( s->storage) ) ) value_type( std::forward< ValueType >( value) ); + s->cycle.store( idx + 1, std::memory_order_release); + return channel_op_status::success; + } + + channel_op_status try_value_pop_( slot *& s, std::size_t & idx) { + idx = consumer_idx_.load( std::memory_order_relaxed); + for (;;) { + s = & slots_[idx & (capacity_ - 1)]; + std::size_t cycle = s->cycle.load( std::memory_order_acquire); + std::intptr_t diff{ static_cast< std::intptr_t >( cycle) - static_cast< std::intptr_t >( idx + 1) }; + if ( 0 == diff) { + if ( consumer_idx_.compare_exchange_weak( idx, idx + 1, std::memory_order_relaxed) ) { + break; + } + } else if ( 0 > diff) { + return channel_op_status::empty; + } else { + idx = consumer_idx_.load( std::memory_order_relaxed); + } + } + // incrementing the slot cycle must be deferred till the value has been consumed + // slot cycle tells procuders that the cell can be re-used (store new value) + return channel_op_status::success; + } + + channel_op_status try_pop_( value_type & value) { + slot * s{ nullptr }; + std::size_t idx{ 0 }; + channel_op_status status{ try_value_pop_( s, idx) }; + if ( channel_op_status::success == status) { + value = std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ); + s->cycle.store( idx + capacity_, std::memory_order_release); + } + return status; + } + +public: + explicit buffered_channel( std::size_t capacity) : + capacity_{ capacity } { + if ( 0 == capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) { + throw std::runtime_error{ "boost fiber: buffer capacity is invalid" }; + } + slots_ = new slot[capacity_](); + for ( std::size_t i = 0; i < capacity_; ++i) { + slots_[i].cycle.store( i, std::memory_order_relaxed); + } + } + + ~buffered_channel() { + close(); + for (;;) { + slot * s{ nullptr }; + std::size_t idx{ 0 }; + if ( channel_op_status::success == try_value_pop_( s, idx) ) { + reinterpret_cast< value_type * >( std::addressof( s->storage) )->~value_type(); + s->cycle.store( idx + capacity_, std::memory_order_release); + } else { + break; + } + } + delete [] slots_; + } + + buffered_channel( buffered_channel const&) = delete; + buffered_channel & operator=( buffered_channel const&) = delete; + + bool is_closed() const noexcept { + return closed_.load( std::memory_order_acquire); + } + + void close() noexcept { + std::unique_lock< std::mutex > lk{ mtx_ }; + closed_.store( true, std::memory_order_release); + not_full_cnd_.notify_all(); + not_empty_cnd_.notify_all(); + } + + channel_op_status push( value_type const& value) { + for (;;) { + if ( is_closed() ) { + return channel_op_status::closed; + } + channel_op_status status{ try_push_( value) }; + if ( channel_op_status::success == status) { + std::unique_lock< std::mutex > lk{ mtx_ }; + if ( 0 < waiting_consumer_) { + not_empty_cnd_.notify_one(); + } + return status; + } else if ( channel_op_status::full == status) { + std::unique_lock< std::mutex > lk{ mtx_ }; + if ( is_closed() ) { + return channel_op_status::closed; + } + if ( ! is_full_() ) { + continue; + } + not_full_cnd_.wait( lk, [this]{ return is_closed() || ! is_full_(); }); + } else { + BOOST_ASSERT( channel_op_status::closed == status); + return status; + } + } + } + + value_type value_pop() { + for (;;) { + slot * s{ nullptr }; + std::size_t idx{ 0 }; + channel_op_status status{ try_value_pop_( s, idx) }; + if ( channel_op_status::success == status) { + value_type value{ std::move( * reinterpret_cast< value_type * >( std::addressof( s->storage) ) ) }; + s->cycle.store( idx + capacity_, std::memory_order_release); + not_full_cnd_.notify_one(); + return std::move( value); + } else if ( channel_op_status::empty == status) { + std::unique_lock< std::mutex > lk{ mtx_ }; + ++waiting_consumer_; + if ( is_closed() ) { + throw std::runtime_error{ "boost fiber: channel is closed" }; + } + if ( ! is_empty_() ) { + continue; + } + not_empty_cnd_.wait( lk, [this](){ return is_closed() || ! is_empty_(); }); + --waiting_consumer_; + } else { + BOOST_ASSERT( channel_op_status::closed == status); + throw std::runtime_error{ "boost fiber: channel is closed" }; + } + } + } +}; + +#endif // BUFFERED_CHANNEL_H diff --git a/src/boost/libs/fiber/performance/thread/skynet_async.cpp b/src/boost/libs/fiber/performance/thread/skynet_async.cpp new file mode 100644 index 00000000..b752e020 --- /dev/null +++ b/src/boost/libs/fiber/performance/thread/skynet_async.cpp @@ -0,0 +1,76 @@ + +// Copyright Oliver Kowalke 2015. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// based on https://github.com/atemerev/skynet from Alexander Temerev + +#include <algorithm> +#include <cassert> +#include <chrono> +#include <condition_variable> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <queue> +#include <future> +#include <iostream> +#include <memory> +#include <mutex> +#include <numeric> +#include <random> +#include <sstream> +#include <vector> + +#include "buffered_channel.hpp" + +using channel_type = buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; + +// microbenchmark +std::uint64_t skynet( std::uint64_t num, std::uint64_t size, std::uint64_t div) +{ + if ( size != 1){ + size /= div; + + std::vector<std::future<std::uint64_t> > results; + results.reserve( div); + + for ( std::uint64_t i = 0; i != div; ++i) { + std::uint64_t sub_num = num + i * size; + results.emplace_back( + std::async( skynet, sub_num, size, div) ); + } + + std::uint64_t sum = 0; + for ( auto& f : results) + sum += f.get(); + + return sum; + } + + return num; +} + +int main() { + try { + std::size_t size{ 10000 }; + std::size_t div{ 10 }; + std::uint64_t result{ 0 }; + duration_type duration{ duration_type::zero() }; + time_point_type start{ clock_type::now() }; + result = skynet( 0, size, div); + duration = clock_type::now() - start; + std::cout << "Result: " << result << " in " << duration.count() / 1000000 << " ms" << std::endl; + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/thread/skynet_pthread.cpp b/src/boost/libs/fiber/performance/thread/skynet_pthread.cpp new file mode 100644 index 00000000..43f55693 --- /dev/null +++ b/src/boost/libs/fiber/performance/thread/skynet_pthread.cpp @@ -0,0 +1,90 @@ +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <numeric> +#include <stdexcept> +#include <vector> + +extern "C" { +#include <pthread.h> +#include <signal.h> +} + +#include "buffered_channel.hpp" + +using channel_type = buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; + +struct thread_args { + channel_type & c; + std::size_t num; + std::size_t size; + std::size_t div; +}; + +// microbenchmark +void * skynet( void * vargs) { + thread_args * args = static_cast< thread_args * >( vargs); + if ( 1 == args->size) { + args->c.push( args->num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < args->div; ++i) { + auto sub_num = args->num + i * args->size / args->div; + auto sub_size = args->size / args->div; + auto size = 8 * 1024; + pthread_attr_t tattr; + if ( 0 != ::pthread_attr_init( & tattr) ) { + std::runtime_error("pthread_attr_init() failed"); + } + if ( 0 != ::pthread_attr_setstacksize( & tattr, size) ) { + std::runtime_error("pthread_attr_setstacksize() failed"); + } + thread_args * targs = new thread_args{ rc, sub_num, sub_size, args->div }; + pthread_t tid; + if ( 0 != ::pthread_create( & tid, & tattr, & skynet, targs) ) { + std::runtime_error("pthread_create() failed"); + } + if ( 0 != ::pthread_detach( tid) ) { + std::runtime_error("pthread_detach() failed"); + } + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < args->div; ++i) { + sum += rc.value_pop(); + } + args->c.push( sum); + } + delete args; + return nullptr; +} + +int main() { + try { + std::size_t size{ 10000 }; + std::size_t div{ 10 }; + std::uint64_t result{ 0 }; + duration_type duration{ duration_type::zero() }; + channel_type rc{ 2 }; + thread_args * targs = new thread_args{ rc, 0, size, div }; + time_point_type start{ clock_type::now() }; + skynet( targs); + result = rc.value_pop(); + duration = clock_type::now() - start; + std::cout << "Result: " << result << " in " << duration.count() / 1000000 << " ms" << std::endl; + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} diff --git a/src/boost/libs/fiber/performance/thread/skynet_std.cpp b/src/boost/libs/fiber/performance/thread/skynet_std.cpp new file mode 100644 index 00000000..93248f2e --- /dev/null +++ b/src/boost/libs/fiber/performance/thread/skynet_std.cpp @@ -0,0 +1,58 @@ +#include <algorithm> +#include <cassert> +#include <chrono> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <iostream> +#include <memory> +#include <numeric> +#include <thread> +#include <vector> + +#include "buffered_channel.hpp" + +using channel_type = buffered_channel< std::uint64_t >; +using clock_type = std::chrono::steady_clock; +using duration_type = clock_type::duration; +using time_point_type = clock_type::time_point; + +// microbenchmark +void skynet( channel_type & c, std::size_t num, std::size_t size, std::size_t div) { + if ( 1 == size) { + c.push( num); + } else { + channel_type rc{ 16 }; + for ( std::size_t i = 0; i < div; ++i) { + auto sub_num = num + i * size / div; + std::thread{ skynet, std::ref( rc), sub_num, size / div, div }.detach(); + } + std::uint64_t sum{ 0 }; + for ( std::size_t i = 0; i < div; ++i) { + sum += rc.value_pop(); + } + c.push( sum); + } +} + +int main() { + try { + std::size_t size{ 10000 }; + std::size_t div{ 10 }; + std::uint64_t result{ 0 }; + duration_type duration{ duration_type::zero() }; + channel_type rc{ 2 }; + time_point_type start{ clock_type::now() }; + skynet( rc, 0, size, div); + result = rc.value_pop(); + duration = clock_type::now() - start; + std::cout << "Result: " << result << " in " << duration.count() / 1000000 << " ms" << std::endl; + std::cout << "done." << std::endl; + return EXIT_SUCCESS; + } catch ( std::exception const& e) { + std::cerr << "exception: " << e.what() << std::endl; + } catch (...) { + std::cerr << "unhandled exception" << std::endl; + } + return EXIT_FAILURE; +} |