summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/mpi/example/parallel_example.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/boost/libs/mpi/example/parallel_example.cpp')
-rw-r--r--src/boost/libs/mpi/example/parallel_example.cpp196
1 files changed, 196 insertions, 0 deletions
diff --git a/src/boost/libs/mpi/example/parallel_example.cpp b/src/boost/libs/mpi/example/parallel_example.cpp
new file mode 100644
index 00000000..00347d51
--- /dev/null
+++ b/src/boost/libs/mpi/example/parallel_example.cpp
@@ -0,0 +1,196 @@
+// Copyright (C) 2005-2006 Matthias Troyer
+
+// Use, modification and distribution is subject to the Boost Software
+// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+// An example of a parallel Monte Carlo simulation using some nodes to produce
+// data and others to aggregate the data
+#include <iostream>
+
+#include <boost/mpi.hpp>
+#include <boost/random/parallel.hpp>
+#include <boost/random.hpp>
+#include <boost/foreach.hpp>
+#include <iostream>
+#include <cstdlib>
+
+namespace mpi = boost::mpi;
+
+enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
+
+
+void calculate_samples(int sample_length)
+{
+ int num_samples = 100;
+ std::vector<double> sample(sample_length);
+
+ // setup communicator by splitting
+
+ mpi::communicator world;
+ mpi::communicator calculate_communicator = world.split(0);
+
+ unsigned int num_calculate_ranks = calculate_communicator.size();
+
+ // the master of the accumulaion ranks is the first of them, hence
+ // with a rank just one after the last calculation rank
+ int master_accumulate_rank = num_calculate_ranks;
+
+ // the master of the calculation ranks sends the skeleton of the sample
+ // to the master of the accumulation ranks
+
+ if (world.rank()==0)
+ world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
+
+ // next we extract the content of the sample vector, to be used in sending
+ // the content later on
+
+ mpi::content sample_content = mpi::get_content(sample);
+
+ // now intialize the parallel random number generator
+
+ boost::lcg64 engine(
+ boost::random::stream_number = calculate_communicator.rank(),
+ boost::random::total_streams = calculate_communicator.size()
+ );
+
+ boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
+ rng(engine,boost::uniform_real<>());
+
+ for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
+
+ // calculate sample by filling the vector with random numbers
+ // note that std::generate will not work since it takes the generator
+ // by value, and boost::ref cannot be used as a generator.
+ // boost::ref should be fixed so that it can be used as generator
+
+ BOOST_FOREACH(double& x, sample)
+ x = rng();
+
+ // send sample to accumulation ranks
+ // Ideally we want to do this as a broadcast with an inter-communicator
+ // between the calculation and accumulation ranks. MPI2 should support
+ // this, but here we present an MPI1 compatible solution.
+
+ // send content of sample to first (master) accumulation process
+
+ world.send(master_accumulate_rank,sample_tag,sample_content);
+
+ // gather some results from all calculation ranks
+
+ double local_result = sample[0];
+ std::vector<double> gathered_results(calculate_communicator.size());
+ mpi::all_gather(calculate_communicator,local_result,gathered_results);
+ }
+
+ // we are done: the master tells the accumulation ranks to quit
+ if (world.rank()==0)
+ world.send(master_accumulate_rank,quit_tag);
+}
+
+
+
+void accumulate_samples()
+{
+ std::vector<double> sample;
+
+ // setup the communicator for all accumulation ranks by splitting
+
+ mpi::communicator world;
+ mpi::communicator accumulate_communicator = world.split(1);
+
+ bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
+
+ // the master receives the sample skeleton
+
+ if (is_master_accumulate_rank)
+ world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
+
+ // and broadcasts it to all accumulation ranks
+ mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
+
+ // next we extract the content of the sample vector, to be used in receiving
+ // the content later on
+
+ mpi::content sample_content = mpi::get_content(sample);
+
+ // accumulate until quit is called
+ double sum=0.;
+ while (true) {
+
+
+ // the accumulation master checks whether we should quit
+ if (world.iprobe(0,quit_tag)) {
+ world.recv(0,quit_tag);
+ for (int i=1; i<accumulate_communicator.size();++i)
+ accumulate_communicator.send(i,quit_tag);
+ std::cout << sum << "\n";
+ break; // We're done
+ }
+
+ // the otehr accumulation ranks check whether we should quit
+ if (accumulate_communicator.iprobe(0,quit_tag)) {
+ accumulate_communicator.recv(0,quit_tag);
+ std::cout << sum << "\n";
+ break; // We're done
+ }
+
+ // check whether the master accumulation rank has received a sample
+ if (world.iprobe(mpi::any_source,sample_tag)) {
+ BOOST_ASSERT(is_master_accumulate_rank);
+
+ // receive the content
+ world.recv(mpi::any_source,sample_tag,sample_content);
+
+ // now we need to braodcast
+ // the problam is we do not have a non-blocking broadcast that we could
+ // abort if we receive a quit message from the master. We thus need to
+ // first tell all accumulation ranks to start a broadcast. If the sample
+ // is small, we could just send the sample in this message, but here we
+ // optimize the code for large samples, so that the overhead of these
+ // sends can be ignored, and we count on an optimized broadcast
+ // implementation with O(log N) complexity
+
+ for (int i=1; i<accumulate_communicator.size();++i)
+ accumulate_communicator.send(i,sample_broadcast_tag);
+
+ // now broadcast the contents of the sample to all accumulate ranks
+ mpi::broadcast(accumulate_communicator,sample_content,0);
+
+ // and handle the sample by summing the appropriate value
+ sum += sample[0];
+ }
+
+ // the other accumulation ranks wait for a mesage to start the broadcast
+ if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
+ BOOST_ASSERT(!is_master_accumulate_rank);
+
+ accumulate_communicator.recv(0,sample_broadcast_tag);
+
+ // receive broadcast of the sample contents
+ mpi::broadcast(accumulate_communicator,sample_content,0);
+
+ // and handle the sample
+
+ // and handle the sample by summing the appropriate value
+ sum += sample[accumulate_communicator.rank()];
+ }
+ }
+}
+
+int main(int argc, char** argv)
+{
+ mpi::environment env(argc, argv);
+ mpi::communicator world;
+
+ // half of the processes generate, the others accumulate
+ // the sample size is just the number of accumulation ranks
+ if (world.rank() < world.size()/2)
+ calculate_samples(world.size()-world.size()/2);
+ else
+ accumulate_samples();
+
+ return 0;
+}
+
+