From 06eb4a907699aae4a22edb4d178b8f8e10c9d5d8 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 7 Nov 2015 16:18:51 +0100 Subject: Adding upstream version 0.6. Signed-off-by: Daniel Baumann --- decompress.cc | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'decompress.cc') diff --git a/decompress.cc b/decompress.cc index 85baaf6..a5d4994 100644 --- a/decompress.cc +++ b/decompress.cc @@ -28,8 +28,8 @@ #include #include #include +#include #include -#include #include #include @@ -111,6 +111,7 @@ public: { ++iwait_counter; xwait( &iav_or_eof, &imutex ); + ++icheck_counter; } if( !ipacket_queues[worker_id].empty() ) { @@ -153,6 +154,7 @@ public: { ++owait_counter; xwait( &oav_or_exit, &omutex ); + ++ocheck_counter; } if( opacket_queues[deliver_worker_id].empty() ) break; opacket = opacket_queues[deliver_worker_id].front(); @@ -201,7 +203,7 @@ struct Splitter_arg // split data from input file into chunks and pass them to // courier for packaging and distribution to workers. -void * splitter( void * arg ) +extern "C" void * dsplitter( void * arg ) { const Splitter_arg & tmp = *(Splitter_arg *)arg; Packet_courier & courier = *tmp.courier; @@ -286,7 +288,7 @@ struct Worker_arg // consume packets from courier, decompress their contents, and // give the produced packets to courier. -void * worker( void * arg ) +extern "C" void * dworker( void * arg ) { const Worker_arg & tmp = *(Worker_arg *)arg; Packet_courier & courier = *tmp.courier; @@ -414,7 +416,9 @@ int decompress( const int num_workers, const int num_slots, splitter_arg.packet_size = packet_size; pthread_t splitter_thread; - xcreate( &splitter_thread, splitter, &splitter_arg ); + int errcode = pthread_create( &splitter_thread, 0, dsplitter, &splitter_arg ); + if( errcode ) + { show_error( "can't create splitter thread", errcode ); fatal(); } Worker_arg * worker_args = new( std::nothrow ) Worker_arg[num_workers]; pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; @@ -426,17 +430,25 @@ int decompress( const int num_workers, const int num_slots, worker_args[i].pp = &pp; worker_args[i].worker_id = i; worker_args[i].packet_size = packet_size; - xcreate( &worker_threads[i], worker, &worker_args[i] ); + errcode = pthread_create( &worker_threads[i], 0, dworker, &worker_args[i] ); + if( errcode ) + { show_error( "can't create worker threads", errcode ); fatal(); } } muxer( courier, pp, outfd ); for( int i = num_workers - 1; i >= 0; --i ) - xjoin( worker_threads[i] ); + { + errcode = pthread_join( worker_threads[i], 0 ); + if( errcode ) + { show_error( "can't join worker threads", errcode ); fatal(); } + } delete[] worker_threads; worker_threads = 0; delete[] worker_args; worker_args = 0; - xjoin( splitter_thread ); + errcode = pthread_join( splitter_thread, 0 ); + if( errcode ) + { show_error( "can't join splitter thread", errcode ); fatal(); } if( verbosity >= 2 ) std::fprintf( stderr, "decompressed size %9lld, size %9lld. ", -- cgit v1.2.3