summaryrefslogtreecommitdiffstats
path: root/src/boost/libs/interprocess/test/message_queue_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/boost/libs/interprocess/test/message_queue_test.cpp')
-rw-r--r--src/boost/libs/interprocess/test/message_queue_test.cpp376
1 files changed, 376 insertions, 0 deletions
diff --git a/src/boost/libs/interprocess/test/message_queue_test.cpp b/src/boost/libs/interprocess/test/message_queue_test.cpp
new file mode 100644
index 00000000..fe76cad5
--- /dev/null
+++ b/src/boost/libs/interprocess/test/message_queue_test.cpp
@@ -0,0 +1,376 @@
+//////////////////////////////////////////////////////////////////////////////
+//
+// (C) Copyright Ion Gaztanaga 2004-2012. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// See http://www.boost.org/libs/interprocess for documentation.
+//
+//////////////////////////////////////////////////////////////////////////////
+
+#include <boost/interprocess/detail/config_begin.hpp>
+#include <boost/interprocess/ipc/message_queue.hpp>
+#include <boost/interprocess/managed_external_buffer.hpp>
+#include <boost/interprocess/managed_heap_memory.hpp>
+#include <boost/interprocess/containers/map.hpp>
+#include <boost/interprocess/containers/set.hpp>
+#include <boost/interprocess/allocators/node_allocator.hpp>
+#include <boost/interprocess/detail/os_thread_functions.hpp>
+// intrusive/detail
+#include <boost/intrusive/detail/minimal_pair_header.hpp>
+#include <boost/intrusive/detail/minimal_less_equal_header.hpp>
+
+#include <boost/move/unique_ptr.hpp>
+
+#include <cstddef>
+#include <memory>
+#include <iostream>
+#include <vector>
+#include <stdexcept>
+#include <limits>
+
+#include "get_process_id_name.hpp"
+
+////////////////////////////////////////////////////////////////////////////////
+// //
+// This example tests the process shared message queue. //
+// //
+////////////////////////////////////////////////////////////////////////////////
+
+using namespace boost::interprocess;
+
+//This test inserts messages with different priority and marks them with a
+//time-stamp to check if receiver obtains highest priority messages first and
+//messages with same priority are received in fifo order
+bool test_priority_order()
+{
+ message_queue::remove(test::get_process_id_name());
+ {
+ message_queue mq1
+ (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t)),
+ mq2
+ (open_or_create, test::get_process_id_name(), 100, sizeof(std::size_t));
+
+ //We test that the queue is ordered by priority and in the
+ //same priority, is a FIFO
+ message_queue::size_type recvd = 0;
+ unsigned int priority = 0;
+ std::size_t tstamp;
+ unsigned int priority_prev;
+ std::size_t tstamp_prev;
+
+ //We will send 100 message with priority 0-9
+ //The message will contain the timestamp of the message
+ for(std::size_t i = 0; i < 100; ++i){
+ tstamp = i;
+ mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(i%10));
+ }
+
+ priority_prev = (std::numeric_limits<unsigned int>::max)();
+ tstamp_prev = 0;
+
+ //Receive all messages and test those are ordered
+ //by priority and by FIFO in the same priority
+ for(std::size_t i = 0; i < 100; ++i){
+ mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
+ if(priority > priority_prev)
+ return false;
+ if(priority == priority_prev &&
+ tstamp <= tstamp_prev){
+ return false;
+ }
+ priority_prev = priority;
+ tstamp_prev = tstamp;
+ }
+
+ //Now retry it with different priority order
+ for(std::size_t i = 0; i < 100; ++i){
+ tstamp = i;
+ mq1.send(&tstamp, sizeof(tstamp), (unsigned int)(9 - i%10));
+ }
+
+ priority_prev = (std::numeric_limits<unsigned int>::max)();
+ tstamp_prev = 0;
+
+ //Receive all messages and test those are ordered
+ //by priority and by FIFO in the same priority
+ for(std::size_t i = 0; i < 100; ++i){
+ mq1.receive(&tstamp, sizeof(tstamp), recvd, priority);
+ if(priority > priority_prev)
+ return false;
+ if(priority == priority_prev &&
+ tstamp <= tstamp_prev){
+ return false;
+ }
+ priority_prev = priority;
+ tstamp_prev = tstamp;
+ }
+ }
+ message_queue::remove(test::get_process_id_name());
+ return true;
+}
+
+//[message_queue_test_test_serialize_db
+//This test creates a in memory data-base using Interprocess machinery and
+//serializes it through a message queue. Then rebuilds the data-base in
+//another buffer and checks it against the original data-base
+bool test_serialize_db()
+{
+ //Typedef data to create a Interprocess map
+ typedef std::pair<const std::size_t, std::size_t> MyPair;
+ typedef std::less<std::size_t> MyLess;
+ typedef node_allocator<MyPair, managed_external_buffer::segment_manager>
+ node_allocator_t;
+ typedef map<std::size_t,
+ std::size_t,
+ std::less<std::size_t>,
+ node_allocator_t>
+ MyMap;
+
+ //Some constants
+ const std::size_t BufferSize = 65536;
+ const std::size_t MaxMsgSize = 100;
+
+ //Allocate a memory buffer to hold the destiny database using vector<char>
+ std::vector<char> buffer_destiny(BufferSize, 0);
+
+ message_queue::remove(test::get_process_id_name());
+ {
+ //Create the message-queues
+ message_queue mq1(create_only, test::get_process_id_name(), 1, MaxMsgSize);
+
+ //Open previously created message-queue simulating other process
+ message_queue mq2(open_only, test::get_process_id_name());
+
+ //A managed heap memory to create the origin database
+ managed_heap_memory db_origin(buffer_destiny.size());
+
+ //Construct the map in the first buffer
+ MyMap *map1 = db_origin.construct<MyMap>("MyMap")
+ (MyLess(),
+ db_origin.get_segment_manager());
+ if(!map1)
+ return false;
+
+ //Fill map1 until is full
+ try{
+ std::size_t i = 0;
+ while(1){
+ (*map1)[i] = i;
+ ++i;
+ }
+ }
+ catch(boost::interprocess::bad_alloc &){}
+
+ //Data control data sending through the message queue
+ std::size_t sent = 0;
+ message_queue::size_type recvd = 0;
+ message_queue::size_type total_recvd = 0;
+ unsigned int priority;
+
+ //Send whole first buffer through the mq1, read it
+ //through mq2 to the second buffer
+ while(1){
+ //Send a fragment of buffer1 through mq1
+ std::size_t bytes_to_send = MaxMsgSize < (db_origin.get_size() - sent) ?
+ MaxMsgSize : (db_origin.get_size() - sent);
+ mq1.send( &static_cast<char*>(db_origin.get_address())[sent]
+ , bytes_to_send
+ , 0);
+ sent += bytes_to_send;
+ //Receive the fragment through mq2 to buffer_destiny
+ mq2.receive( &buffer_destiny[total_recvd]
+ , BufferSize - recvd
+ , recvd
+ , priority);
+ total_recvd += recvd;
+
+ //Check if we have received all the buffer
+ if(total_recvd == BufferSize){
+ break;
+ }
+ }
+
+ //The buffer will contain a copy of the original database
+ //so let's interpret the buffer with managed_external_buffer
+ managed_external_buffer db_destiny(open_only, &buffer_destiny[0], BufferSize);
+
+ //Let's find the map
+ std::pair<MyMap *, managed_external_buffer::size_type> ret = db_destiny.find<MyMap>("MyMap");
+ MyMap *map2 = ret.first;
+
+ //Check if we have found it
+ if(!map2){
+ return false;
+ }
+
+ //Check if it is a single variable (not an array)
+ if(ret.second != 1){
+ return false;
+ }
+
+ //Now let's compare size
+ if(map1->size() != map2->size()){
+ return false;
+ }
+
+ //Now let's compare all db values
+ MyMap::size_type num_elements = map1->size();
+ for(std::size_t i = 0; i < num_elements; ++i){
+ if((*map1)[i] != (*map2)[i]){
+ return false;
+ }
+ }
+
+ //Destroy maps from db-s
+ db_origin.destroy_ptr(map1);
+ db_destiny.destroy_ptr(map2);
+ }
+ message_queue::remove(test::get_process_id_name());
+ return true;
+}
+//]
+
+static const int MsgSize = 10;
+static const int NumMsg = 1000;
+static char msgsend [10];
+static char msgrecv [10];
+
+static boost::interprocess::message_queue *pmessage_queue;
+
+void receiver()
+{
+ boost::interprocess::message_queue::size_type recvd_size;
+ unsigned int priority;
+ int nummsg = NumMsg;
+
+ while(nummsg--){
+ pmessage_queue->receive(msgrecv, MsgSize, recvd_size, priority);
+ }
+}
+
+bool test_buffer_overflow()
+{
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ {
+ boost::movelib::unique_ptr<boost::interprocess::message_queue>
+ ptr(new boost::interprocess::message_queue
+ (create_only, test::get_process_id_name(), 10, 10));
+ pmessage_queue = ptr.get();
+
+ //Launch the receiver thread
+ boost::interprocess::ipcdetail::OS_thread_t thread;
+ boost::interprocess::ipcdetail::thread_launch(thread, &receiver);
+ boost::interprocess::ipcdetail::thread_yield();
+
+ int nummsg = NumMsg;
+
+ while(nummsg--){
+ pmessage_queue->send(msgsend, MsgSize, 0);
+ }
+
+ boost::interprocess::ipcdetail::thread_join(thread);
+ }
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ return true;
+}
+
+
+//////////////////////////////////////////////////////////////////////////////
+//
+// test_multi_sender_receiver is based on Alexander (aalutov's)
+// testcase for ticket #9221. Many thanks.
+//
+//////////////////////////////////////////////////////////////////////////////
+
+static boost::interprocess::message_queue *global_queue = 0;
+//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
+static const int MULTI_NUM_MSG_PER_SENDER = 10000;
+//Message queue message capacity
+static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
+//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
+static const int MULTI_THREAD_COUNT = 10;
+
+static void multisend()
+{
+ char buff;
+ for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
+ global_queue->send(&buff, 1, 0);
+ }
+ global_queue->send(&buff, 0, 0);
+ //std::cout<<"writer thread complete"<<std::endl;
+}
+
+static void multireceive()
+{
+ char buff;
+ size_t size;
+ int received_msgs = 0;
+ unsigned int priority;
+ do {
+ global_queue->receive(&buff, 1, size, priority);
+ ++received_msgs;
+ } while (size > 0);
+ --received_msgs;
+ //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
+}
+
+
+bool test_multi_sender_receiver()
+{
+ bool ret = true;
+ //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
+ try {
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ boost::interprocess::message_queue mq
+ (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
+ global_queue = &mq;
+ std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
+
+ //Launch senders receiver thread
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[i], &multisend);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[MULTI_THREAD_COUNT+i], &multireceive);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
+ boost::interprocess::ipcdetail::thread_join(threads[i]);
+ //std::cout << "Joined thread " << i << std::endl;
+ }
+ }
+ catch (std::exception &e) {
+ std::cout << "error " << e.what() << std::endl;
+ ret = false;
+ }
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ return ret;
+}
+
+
+int main ()
+{
+ if(!test_priority_order()){
+ return 1;
+ }
+
+ if(!test_serialize_db()){
+ return 1;
+ }
+
+ if(!test_buffer_overflow()){
+ return 1;
+ }
+
+ if(!test_multi_sender_receiver()){
+ return 1;
+ }
+
+ return 0;
+}
+
+#include <boost/interprocess/detail/config_end.hpp>