diff options
Diffstat (limited to 'src/boost/libs/mpi/example/generate_collect_optional.cpp')
-rw-r--r-- | src/boost/libs/mpi/example/generate_collect_optional.cpp | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/src/boost/libs/mpi/example/generate_collect_optional.cpp b/src/boost/libs/mpi/example/generate_collect_optional.cpp new file mode 100644 index 00000000..3aa3888c --- /dev/null +++ b/src/boost/libs/mpi/example/generate_collect_optional.cpp @@ -0,0 +1,113 @@ +// Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com> + +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) + +// An example using Boost.MPI's split() operation on communicators to +// create separate data-generating processes and data-collecting +// processes using boost::optional for broadcasting. +#include <boost/mpi.hpp> +#include <iostream> +#include <cstdlib> +#include <boost/serialization/vector.hpp> +#include <boost/serialization/optional.hpp> +namespace mpi = boost::mpi; + +enum message_tags { msg_data_packet, msg_finished }; + +void generate_data(mpi::communicator local, mpi::communicator world) +{ + using std::srand; + using std::rand; + + // The rank of the collector within the world communicator + int master_collector = local.size(); + + srand(time(0) + world.rank()); + + // Send out several blocks of random data to the collectors. + int num_data_blocks = rand() % 3 + 1; + for (int block = 0; block < num_data_blocks; ++block) { + // Generate some random dataa + int num_samples = rand() % 1000; + std::vector<int> data; + for (int i = 0; i < num_samples; ++i) { + data.push_back(rand()); + } + + // Send our data to the master collector process. + std::cout << "Generator #" << local.rank() << " sends some data..." + << std::endl; + world.send(master_collector, msg_data_packet, data); + } + + // Wait for all of the generators to complete + (local.barrier)(); + + // The first generator will send the message to the master collector + // indicating that we're done. + if (local.rank() == 0) + world.send(master_collector, msg_finished); +} + +void collect_data(mpi::communicator local, mpi::communicator world) +{ + // The rank of the collector within the world communicator + int master_collector = world.size() - local.size(); + + if (world.rank() == master_collector) { + while (true) { + // Wait for a message + mpi::status msg = world.probe(); + if (msg.tag() == msg_data_packet) { + // Receive the packet of data into a boost::optional + boost::optional<std::vector<int> > data; + data = std::vector<int>(); + world.recv(msg.source(), msg.source(), *data); + + // Broadcast the actual data. + broadcast(local, data, 0); + } else if (msg.tag() == msg_finished) { + // Receive the message + world.recv(msg.source(), msg.tag()); + + // Broadcast to each collector to tell them we've finished. + boost::optional<std::vector<int> > data; + broadcast(local, data, 0); + break; + } + } + } else { + boost::optional<std::vector<int> > data; + do { + // Wait for a broadcast from the master collector + broadcast(local, data, 0); + if (data) { + std::cout << "Collector #" << local.rank() + << " is processing data." << std::endl; + } + } while (data); + } +} + +int main(int argc, char* argv[]) +{ + mpi::environment env(argc, argv); + mpi::communicator world; + + if (world.size() < 4) { + if (world.rank() == 0) { + std::cerr << "Error: this example requires at least 4 processes." + << std::endl; + } + env.abort(-1); + } + + bool is_generator = world.rank() < 2 * world.size() / 3; + mpi::communicator local = world.split(is_generator? 0 : 1); + if (is_generator) generate_data(local, world); + else collect_data(local, world); + + return 0; +} |