summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/thread/example/parallel_accumulate.cpp
blob: acd72017eda92dada39b2087ee824d8ad7f48dc5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (C) 2014 Vicente Botet
//
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <boost/config.hpp>

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS
#define BOOST_THREAD_USES_LOG_THREAD_ID
#define BOOST_THREAD_QUEUE_DEPRECATE_OLD
#if ! defined  BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/future.hpp>

#include <numeric>
#include <algorithm>
#include <functional>
#include <iostream>
#include <vector>

#if defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)

template<typename Iterator,typename T>
struct accumulate_block
{
    //typedef T result_type;
    T operator()(Iterator first,Iterator last)
    {
        return std::accumulate(first,last,T());
    }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
    unsigned long const length=static_cast<unsigned long>(std::distance(first,last));

    if(!length)
        return init;

    unsigned long const block_size=25;
    unsigned long const num_blocks=(length+block_size-1)/block_size;

    boost::csbl::vector<boost::future<T> > futures(num_blocks-1);
    boost::basic_thread_pool pool;

    Iterator block_start=first;
    for(unsigned long i=0;i<(num_blocks-1);++i)
    {
        Iterator block_end=block_start;
        std::advance(block_end,block_size);
        futures[i]=boost::async(pool, accumulate_block<Iterator,T>(), block_start, block_end);
        block_start=block_end;
    }
    T last_result=accumulate_block<Iterator,T>()(block_start,last);
    T result=init;
    for(unsigned long i=0;i<(num_blocks-1);++i)
    {
        result+=futures[i].get();
    }
    result += last_result;
    return result;
}


int main()
{
  try
  {
    const int s = 1001;
    std::vector<int> vec;
    vec.reserve(s);
    for (int i=0; i<s;++i)
      vec.push_back(1);
    int r = parallel_accumulate(vec.begin(), vec.end(),0);
    std::cout << r << std::endl;

  }
  catch (std::exception& ex)
  {
    std::cout << "ERROR= " << ex.what() << "" << std::endl;
    return 1;
  }
  catch (...)
  {
    std::cout << " ERROR= exception thrown" << std::endl;
    return 2;
  }
  return 0;
}

#else
///#warning "This compiler doesn't supports variadics"
int main()
{
  return 0;
}
#endif