summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/mpi/example/parallel_example.cpp
blob: 00347d51cabd36c33019ca78c9fb7e6f8e46b86b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright (C) 2005-2006 Matthias Troyer

// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

// An example of a parallel Monte Carlo simulation using some nodes to produce
// data and others to aggregate the data
#include <iostream>

#include <boost/mpi.hpp>
#include <boost/random/parallel.hpp>
#include <boost/random.hpp>
#include <boost/foreach.hpp>
#include <iostream>
#include <cstdlib>

namespace mpi = boost::mpi;

enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};


void calculate_samples(int sample_length)
{
  int num_samples = 100;
  std::vector<double> sample(sample_length);

  // setup communicator by splitting

  mpi::communicator world;
  mpi::communicator calculate_communicator = world.split(0);

  unsigned int num_calculate_ranks = calculate_communicator.size();
  
  // the master of the accumulaion ranks is the first of them, hence
  // with a rank just one after the last calculation rank
  int master_accumulate_rank = num_calculate_ranks;
  
  // the master of the calculation ranks sends the skeleton of the sample 
  // to the master of the accumulation ranks

  if (world.rank()==0)
    world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
  
  // next we extract the content of the sample vector, to be used in sending
  // the content later on
  
  mpi::content sample_content = mpi::get_content(sample);
  
  // now intialize the parallel random number generator
  
  boost::lcg64 engine(
        boost::random::stream_number = calculate_communicator.rank(),
        boost::random::total_streams = calculate_communicator.size()
      );
      
  boost::variate_generator<boost::lcg64&,boost::uniform_real<> > 
    rng(engine,boost::uniform_real<>());
    
  for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
    
    // calculate sample by filling the vector with random numbers
    // note that std::generate will not work since it takes the generator
    // by value, and boost::ref cannot be used as a generator.
    // boost::ref should be fixed so that it can be used as generator
    
    BOOST_FOREACH(double& x, sample)
      x = rng();
    
    // send sample to accumulation ranks
    // Ideally we want to do this as a broadcast with an inter-communicator 
    // between the calculation and accumulation ranks. MPI2 should support 
    // this, but here we present an MPI1 compatible solution.
    
    // send content of sample to first (master) accumulation process
    
    world.send(master_accumulate_rank,sample_tag,sample_content);
    
    // gather some results from all calculation ranks
    
    double local_result = sample[0];
    std::vector<double> gathered_results(calculate_communicator.size());
    mpi::all_gather(calculate_communicator,local_result,gathered_results);
  }
  
  // we are done: the master tells the accumulation ranks to quit
  if (world.rank()==0)
    world.send(master_accumulate_rank,quit_tag);
}



void accumulate_samples()
{
  std::vector<double> sample;

  // setup the communicator for all accumulation ranks by splitting

  mpi::communicator world;
  mpi::communicator accumulate_communicator = world.split(1);

  bool is_master_accumulate_rank = accumulate_communicator.rank()==0;

  // the master receives the sample skeleton
  
  if (is_master_accumulate_rank) 
    world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
    
  // and broadcasts it to all accumulation ranks
  mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
  
  // next we extract the content of the sample vector, to be used in receiving
  // the content later on
  
  mpi::content sample_content = mpi::get_content(sample);
  
  // accumulate until quit is called
  double sum=0.;
  while (true) {
  
    
    // the accumulation master checks whether we should quit
    if (world.iprobe(0,quit_tag)) {
      world.recv(0,quit_tag);
      for (int i=1; i<accumulate_communicator.size();++i)
        accumulate_communicator.send(i,quit_tag);
      std::cout << sum << "\n";
      break; // We're done
    }

    // the otehr accumulation ranks check whether we should quit
    if (accumulate_communicator.iprobe(0,quit_tag)) {
      accumulate_communicator.recv(0,quit_tag);
      std::cout << sum << "\n";
      break; // We're done
    }
    
    // check whether the master accumulation rank has received a sample
    if (world.iprobe(mpi::any_source,sample_tag)) {
      BOOST_ASSERT(is_master_accumulate_rank);
      
      // receive the content
      world.recv(mpi::any_source,sample_tag,sample_content);
      
      // now we need to braodcast
      // the problam is we do not have a non-blocking broadcast that we could 
      // abort if we receive a quit message from the master. We thus need to
      // first tell all accumulation ranks to start a broadcast. If the sample
      // is small, we could just send the sample in this message, but here we
      // optimize the code for large samples, so that the overhead of these
      // sends can be ignored, and we count on an optimized broadcast
      // implementation with O(log N) complexity

      for (int i=1; i<accumulate_communicator.size();++i)
        accumulate_communicator.send(i,sample_broadcast_tag);
      
      // now broadcast the contents of the sample to all accumulate ranks
      mpi::broadcast(accumulate_communicator,sample_content,0);
      
      // and handle the sample by summing the appropriate value
      sum += sample[0];
    }
  
    // the other accumulation ranks wait for a mesage to start the broadcast
    if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
      BOOST_ASSERT(!is_master_accumulate_rank);
      
      accumulate_communicator.recv(0,sample_broadcast_tag);
      
      // receive broadcast of the sample contents
      mpi::broadcast(accumulate_communicator,sample_content,0);
      
      // and handle the sample
      
      // and handle the sample by summing the appropriate value
      sum += sample[accumulate_communicator.rank()];
    }
  }
}

int main(int argc, char** argv)
{
  mpi::environment env(argc, argv);
  mpi::communicator world;

  // half of the processes generate, the others accumulate
  // the sample size is just the number of accumulation ranks
  if (world.rank() < world.size()/2)
    calculate_samples(world.size()-world.size()/2);
  else
    accumulate_samples();

  return 0;
}