// Copyright (C) 2006 Davis E. King (davis@dlib.net) // License: Boost Software License See LICENSE.txt for the full license. #include #include #include #include #include #include #include "tester.h" namespace { using namespace test; using namespace dlib; using namespace std; logger dlog("test.pipe"); namespace pipe_kernel_test_helpers { const unsigned long proc1_count = 10000; dlib::mutex m; signaler s(m); unsigned long threads_running = 0; bool found_error; inline void add_running_thread ( ) { auto_mutex M(m); ++threads_running; } inline void remove_running_thread ( ) { auto_mutex M(m); --threads_running; s.broadcast(); } inline void wait_for_threads ( ) { auto_mutex M(m); while (threads_running > 0) s.wait(); } template < typename pipe > void threadproc1 ( void* param ) { add_running_thread(); pipe& p = *static_cast(param); try { int last = -1; for (unsigned long i = 0; i < proc1_count; ++i) { int cur=0; DLIB_TEST(p.dequeue(cur) == true); DLIB_TEST(last + 1 == cur); last = cur; } DLIB_TEST(p.size() == 0); } catch(exception& e) { auto_mutex M(m); found_error = true; cout << "\n\nERRORS FOUND" << endl; cout << e.what() << endl; dlog << LWARN << "ERRORS FOUND"; dlog << LWARN << e.what(); p.disable(); } remove_running_thread(); } template < typename pipe > void threadproc2 ( void* param ) { add_running_thread(); pipe& p = *static_cast(param); try { int last = -1; int cur; while (p.dequeue(cur)) { DLIB_TEST(last < cur); last = cur; } auto_mutex M(m); } catch(exception& e) { auto_mutex M(m); found_error = true; cout << "\n\nERRORS FOUND" << endl; cout << e.what() << endl; dlog << LWARN << "ERRORS FOUND"; dlog << LWARN << e.what(); p.disable(); } remove_running_thread(); } template < typename pipe > void threadproc3 ( void* param ) { add_running_thread(); pipe& p = *static_cast(param); try { int last = -1; int cur; while (p.dequeue_or_timeout(cur,100000)) { DLIB_TEST(last < cur); last = cur; } auto_mutex M(m); } catch(exception& e) { auto_mutex M(m); found_error = true; cout << "\n\nERRORS FOUND" << endl; cout << e.what() << endl; dlog << LWARN << "ERRORS FOUND"; dlog << LWARN << e.what(); p.disable(); } remove_running_thread(); } } // ---------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------- template class PipelineProcessor : private dlib::threaded_object { public: PipelineProcessor( dlib::pipe & in, dlib::pipe & out) : InPipe(in), OutPipe(out), InMsg(), OutMsg() { start(); } ~PipelineProcessor() { // signal the thread to stop stop(); wait(); } private: dlib::pipe & InPipe; dlib::pipe & OutPipe; in_type InMsg; out_type OutMsg; void thread() { while (!should_stop()) { if(InPipe.dequeue_or_timeout(InMsg, 100)) { // if function signals ready to send OutMsg while (!OutPipe.enqueue_or_timeout(OutMsg, 100)) { // try to send until should stop if (should_stop()) { return; } } } } }; }; void do_zero_size_test_with_timeouts() { dlog << LINFO << "in do_zero_size_test_with_timeouts()"; // make sure we can get though this without deadlocking for (int k = 0; k < 10; ++k) { dlib::pipe in_pipe(10); dlib::pipe out_pipe(0); { PipelineProcessor pp(in_pipe, out_pipe); int in = 1; in_pipe.enqueue(in); in = 2; in_pipe.enqueue(in); in = 3; in_pipe.enqueue(in); // sleep to make sure thread enqueued dlib::sleep(100); float out = 1.0f; out_pipe.dequeue(out); dlib::sleep(100); } print_spinner(); } } // ---------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------- template < typename pipe > void pipe_kernel_test ( ) /*! requires - pipe is an implementation of pipe/pipe_kernel_abstract.h and is instantiated with int ensures - runs tests on pipe for compliance with the specs !*/ { using namespace pipe_kernel_test_helpers; found_error = false; print_spinner(); pipe test(10), test2(100); pipe test_0(0), test2_0(0); pipe test_1(1), test2_1(1); DLIB_TEST(test.size() == 0); DLIB_TEST(test2.size() == 0); DLIB_TEST(test_0.size() == 0); DLIB_TEST(test2_0.size() == 0); DLIB_TEST(test_1.size() == 0); DLIB_TEST(test2_1.size() == 0); DLIB_TEST(test.is_enqueue_enabled() == true); DLIB_TEST(test.is_dequeue_enabled() == true); DLIB_TEST(test.is_enabled() == true); test.empty(); test2.empty(); DLIB_TEST(test.size() == 0); DLIB_TEST(test2.size() == 0); test_0.empty(); test2_0.empty(); DLIB_TEST(test_0.size() == 0); DLIB_TEST(test2_0.size() == 0); test_1.empty(); test2_1.empty(); DLIB_TEST(test_1.size() == 0); DLIB_TEST(test2_1.size() == 0); int a; a = 3; test.enqueue(a); DLIB_TEST(test.size() == 1); a = 5; test.enqueue(a); DLIB_TEST(test.size() == 2); a = 0; test.dequeue(a); DLIB_TEST(a == 3); DLIB_TEST(test.size() == 1); a = 0; test.dequeue(a); DLIB_TEST(a == 5); DLIB_TEST(test.size() == 0); print_spinner(); { dlog << LINFO << "starting normal length pipe tests"; create_new_thread(&threadproc1,&test); create_new_thread(&threadproc2,&test2); create_new_thread(&threadproc2,&test2); create_new_thread(&threadproc2,&test2); for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test.enqueue(a); } DLIB_TEST(test.is_enqueue_enabled() == true); test.disable_enqueue(); DLIB_TEST(test.is_enqueue_enabled() == false); for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test.enqueue(a); } for (unsigned long i = 0; i < 100000; ++i) { a = i; if (i%2 == 0) test2.enqueue(a); else test2.enqueue_or_timeout(a,100000); } test2.wait_for_num_blocked_dequeues(3); DLIB_TEST(test2.size() == 0); test2.disable(); wait_for_threads(); DLIB_TEST(test2.size() == 0); test2.enable(); print_spinner(); create_new_thread(&threadproc3,&test2); create_new_thread(&threadproc3,&test2); for (unsigned long i = 0; i < 100000; ++i) { a = i; if (i%2 == 0) test2.enqueue(a); else test2.enqueue_or_timeout(a,100000); } test2.wait_for_num_blocked_dequeues(2); DLIB_TEST(test2.size() == 0); test2.disable(); wait_for_threads(); DLIB_TEST(test2.size() == 0); } print_spinner(); { dlog << LINFO << "starting 0 length pipe tests"; create_new_thread(&threadproc1,&test_0); create_new_thread(&threadproc2,&test2_0); create_new_thread(&threadproc2,&test2_0); create_new_thread(&threadproc2,&test2_0); dlog << LTRACE << "0: 1"; for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test_0.enqueue(a); } dlog << LTRACE << "0: 2"; DLIB_TEST(test_0.is_enqueue_enabled() == true); test_0.disable_enqueue(); DLIB_TEST(test_0.is_enqueue_enabled() == false); for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test_0.enqueue(a); } dlog << LTRACE << "0: 3"; for (unsigned long i = 0; i < 100000; ++i) { a = i; if (i%2 == 0) test2_0.enqueue(a); else test2_0.enqueue_or_timeout(a,100000); } print_spinner(); dlog << LTRACE << "0: 4"; test2_0.wait_for_num_blocked_dequeues(3); DLIB_TEST(test2_0.size() == 0); test2_0.disable(); wait_for_threads(); DLIB_TEST(test2_0.size() == 0); dlog << LTRACE << "0: 5"; test2_0.enable(); create_new_thread(&threadproc3,&test2_0); create_new_thread(&threadproc3,&test2_0); for (unsigned long i = 0; i < 20000; ++i) { if ((i%100) == 0) print_spinner(); a = i; if (i%2 == 0) test2_0.enqueue(a); else test2_0.enqueue_or_timeout(a,100000); } dlog << LTRACE << "0: 6"; test2_0.wait_for_num_blocked_dequeues(2); DLIB_TEST(test2_0.size() == 0); test2_0.disable(); wait_for_threads(); DLIB_TEST(test2_0.size() == 0); dlog << LTRACE << "0: 7"; } print_spinner(); { dlog << LINFO << "starting 1 length pipe tests"; create_new_thread(&threadproc1,&test_1); create_new_thread(&threadproc2,&test2_1); create_new_thread(&threadproc2,&test2_1); create_new_thread(&threadproc2,&test2_1); for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test_1.enqueue(a); } DLIB_TEST(test_1.is_enqueue_enabled() == true); test_1.disable_enqueue(); DLIB_TEST(test_1.is_enqueue_enabled() == false); for (unsigned long i = 0; i < proc1_count; ++i) { a = i; test_1.enqueue(a); } print_spinner(); for (unsigned long i = 0; i < 100000; ++i) { a = i; if (i%2 == 0) test2_1.enqueue(a); else test2_1.enqueue_or_timeout(a,100000); } test2_1.wait_for_num_blocked_dequeues(3); DLIB_TEST(test2_1.size() == 0); test2_1.disable(); wait_for_threads(); DLIB_TEST(test2_1.size() == 0); test2_1.enable(); create_new_thread(&threadproc3,&test2_1); create_new_thread(&threadproc3,&test2_1); for (unsigned long i = 0; i < 100000; ++i) { a = i; if (i%2 == 0) test2_1.enqueue(a); else test2_1.enqueue_or_timeout(a,100000); } test2_1.wait_for_num_blocked_dequeues(2); DLIB_TEST(test2_1.size() == 0); test2_1.disable(); wait_for_threads(); DLIB_TEST(test2_1.size() == 0); } test.enable_enqueue(); test_0.enable_enqueue(); test_1.enable_enqueue(); DLIB_TEST(test.is_enabled()); DLIB_TEST(test.is_enqueue_enabled()); DLIB_TEST(test_0.is_enabled()); DLIB_TEST(test_0.is_enqueue_enabled()); DLIB_TEST(test_1.is_enabled()); DLIB_TEST(test_1.is_enqueue_enabled()); DLIB_TEST(test.size() == 0); DLIB_TEST(test_0.size() == 0); DLIB_TEST(test_1.size() == 0); DLIB_TEST(test.max_size() == 10); DLIB_TEST(test_0.max_size() == 0); DLIB_TEST(test_1.max_size() == 1); for (int i = 0; i < 100; ++i) { a = 1; test.enqueue_or_timeout(a,0); a = 1; test_0.enqueue_or_timeout(a,0); a = 1; test_1.enqueue_or_timeout(a,0); } DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() ); DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() ); DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() ); for (int i = 0; i < 10; ++i) { a = 0; DLIB_TEST(test.enqueue_or_timeout(a,10) == false); a = 0; DLIB_TEST(test_0.enqueue_or_timeout(a,10) == false); a = 0; DLIB_TEST(test_1.enqueue_or_timeout(a,10) == false); } DLIB_TEST_MSG(test.size() == 10,"size: " << test.size() ); DLIB_TEST_MSG(test_0.size() == 0,"size: " << test.size() ); DLIB_TEST_MSG(test_1.size() == 1,"size: " << test.size() ); for (int i = 0; i < 10; ++i) { a = 0; DLIB_TEST(test.dequeue_or_timeout(a,0) == true); DLIB_TEST(a == 1); } DLIB_TEST(test.max_size() == 10); DLIB_TEST(test_0.max_size() == 0); DLIB_TEST(test_1.max_size() == 1); a = 0; DLIB_TEST(test_1.dequeue_or_timeout(a,0) == true); DLIB_TEST(test.max_size() == 10); DLIB_TEST(test_0.max_size() == 0); DLIB_TEST(test_1.max_size() == 1); DLIB_TEST_MSG(a == 1,"a: " << a); DLIB_TEST(test.size() == 0); DLIB_TEST(test_0.size() == 0); DLIB_TEST(test_1.size() == 0); DLIB_TEST(test.dequeue_or_timeout(a,0) == false); DLIB_TEST(test_0.dequeue_or_timeout(a,0) == false); DLIB_TEST(test_1.dequeue_or_timeout(a,0) == false); DLIB_TEST(test.dequeue_or_timeout(a,10) == false); DLIB_TEST(test_0.dequeue_or_timeout(a,10) == false); DLIB_TEST(test_1.dequeue_or_timeout(a,10) == false); DLIB_TEST(test.size() == 0); DLIB_TEST(test_0.size() == 0); DLIB_TEST(test_1.size() == 0); DLIB_TEST(found_error == false); { test.enable(); test.enable_enqueue(); test.empty(); DLIB_TEST(test.size() == 0); DLIB_TEST(test.is_enabled() == true); DLIB_TEST(test.is_enqueue_enabled() == true); DLIB_TEST(test.is_dequeue_enabled() == true); test.disable_dequeue(); dlog << LINFO << "Make sure disable_dequeue() works right..."; DLIB_TEST(test.is_dequeue_enabled() == false); DLIB_TEST(test.dequeue(a) == false); test.wait_until_empty(); a = 4; test.enqueue(a); test.wait_until_empty(); test.wait_for_num_blocked_dequeues(4); DLIB_TEST(test.size() == 1); DLIB_TEST(test.dequeue(a) == false); DLIB_TEST(test.dequeue_or_timeout(a,10000) == false); DLIB_TEST(test.size() == 1); a = 0; test.enable_dequeue(); DLIB_TEST(test.is_dequeue_enabled() == true); DLIB_TEST(test.dequeue(a) == true); DLIB_TEST(a == 4); test_1.wait_until_empty(); } { test_1.enable(); test_1.enable_enqueue(); test_1.empty(); DLIB_TEST(test_1.size() == 0); DLIB_TEST(test_1.is_enabled() == true); DLIB_TEST(test_1.is_enqueue_enabled() == true); DLIB_TEST(test_1.is_dequeue_enabled() == true); test_1.disable_dequeue(); dlog << LINFO << "Make sure disable_dequeue() works right..."; DLIB_TEST(test_1.is_dequeue_enabled() == false); DLIB_TEST(test_1.dequeue(a) == false); a = 4; test_1.wait_for_num_blocked_dequeues(4); test_1.wait_for_num_blocked_dequeues(0); test_1.enqueue(a); test_1.wait_until_empty(); DLIB_TEST(test_1.size() == 1); DLIB_TEST(test_1.dequeue(a) == false); DLIB_TEST(test_1.dequeue_or_timeout(a,10000) == false); DLIB_TEST(test_1.size() == 1); a = 0; test_1.enable_dequeue(); DLIB_TEST(test_1.is_dequeue_enabled() == true); DLIB_TEST(test_1.dequeue(a) == true); DLIB_TEST(a == 4); test_1.wait_until_empty(); } } class pipe_tester : public tester { public: pipe_tester ( ) : tester ("test_pipe", "Runs tests on the pipe component.") {} void perform_test ( ) { pipe_kernel_test >(); do_zero_size_test_with_timeouts(); } } a; }