diff options
Diffstat (limited to 'compress.cc')
-rw-r--r-- | compress.cc | 55 |
1 files changed, 30 insertions, 25 deletions
diff --git a/compress.cc b/compress.cc index f055b03..7945cf0 100644 --- a/compress.cc +++ b/compress.cc @@ -48,11 +48,14 @@ void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ) { - int errcode = pthread_mutex_init( mutex, 0 ); - if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); } - - errcode = pthread_cond_init( cond, 0 ); + int errcode = pthread_cond_init( cond, 0 ); if( errcode ) { show_error( "pthread_cond_init", errcode ); fatal(); } + + if( mutex ) + { + errcode = pthread_mutex_init( mutex, 0 ); + if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); } + } } @@ -61,8 +64,11 @@ void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ) int errcode = pthread_cond_destroy( cond ); if( errcode ) { show_error( "pthread_cond_destroy", errcode ); fatal(); } - errcode = pthread_mutex_destroy( mutex ); - if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); } + if( mutex ) + { + errcode = pthread_mutex_destroy( mutex ); + if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); } + } } @@ -125,16 +131,16 @@ public: private: unsigned long long receive_id; // id assigned to next packet received unsigned long long deliver_id; // id of next packet to be delivered - Slot_tally slot_tally; + Slot_tally slot_tally; // limits the number of input packets std::queue< Packet * > packet_queue; std::vector< Packet * > circular_buffer; - int num_working; // Number of workers still running + int num_working; // number of workers still running const int num_slots; // max packets in circulation pthread_mutex_t imutex; pthread_cond_t iav_or_eof; // input packet available or splitter done pthread_mutex_t omutex; pthread_cond_t oav_or_exit; // output packet available or all workers exited - bool eof; // splitter done + bool eof; // splitter done public: Packet_courier( const int num_workers, const int slots ) @@ -148,6 +154,8 @@ public: ~Packet_courier() { xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); } + const Slot_tally & tally() const { return slot_tally; } + // make a packet with data received from splitter void receive_packet( uint8_t * const data, const int size ) { @@ -182,10 +190,9 @@ public: xunlock( &imutex ); if( ipacket == 0 ) { - // Notify muxer when last worker exits + // notify muxer when last worker exits xlock( &omutex ); - if( --num_working == 0 ) - xsignal( &oav_or_exit ); + if( --num_working == 0 ) xsignal( &oav_or_exit ); xunlock( &omutex ); } return ipacket; @@ -198,7 +205,7 @@ public: // id collision shouldn't happen if( circular_buffer[opacket->id%num_slots] != 0 ) internal_error( "id collision in collect_packet" ); - // Merge packet into circular buffer + // merge packet into circular buffer circular_buffer[opacket->id%num_slots] = opacket; if( opacket->id == deliver_id ) xsignal( &oav_or_exit ); xunlock( &omutex ); @@ -240,8 +247,6 @@ public: if( circular_buffer[i] != 0 ) return false; return true; } - - const Slot_tally & tally() const { return slot_tally; } }; @@ -267,10 +272,10 @@ extern "C" void * csplitter( void * arg ) for( bool first_post = true; ; first_post = false ) { uint8_t * data = new( std::nothrow ) uint8_t[data_size]; - if( data == 0 ) { pp( "not enough memory" ); fatal(); } + if( data == 0 ) { pp( "Not enough memory" ); fatal(); } const int size = readblock( infd, data, data_size ); if( size != data_size && errno ) - { pp(); show_error( "read error", errno ); fatal(); } + { pp(); show_error( "Read error", errno ); fatal(); } if( size > 0 || first_post ) // first packet can be empty { @@ -314,7 +319,7 @@ extern "C" void * cworker( void * arg ) const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size]; - if( new_data == 0 ) { pp( "not enough memory" ); fatal(); } + if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); } const int dict_size = std::max( LZ_min_dictionary_size(), std::min( dictionary_size, packet->size ) ); LZ_Encoder * const encoder = @@ -322,7 +327,7 @@ extern "C" void * cworker( void * arg ) if( !encoder || LZ_compress_errno( encoder ) != LZ_ok ) { if( !encoder || LZ_compress_errno( encoder ) == LZ_mem_error ) - pp( "not enough memory. Try a smaller dictionary size" ); + pp( "Not enough memory. Try a smaller dictionary size" ); else internal_error( "invalid argument to encoder" ); fatal(); @@ -386,7 +391,7 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) { const int wr = writeblock( outfd, opacket->data, opacket->size ); if( wr != opacket->size ) - { pp(); show_error( "write error", errno ); fatal(); } + { pp(); show_error( "Write error", errno ); fatal(); } } delete[] opacket->data; delete opacket; @@ -416,7 +421,7 @@ int compress( const int data_size, const int dictionary_size, pthread_t splitter_thread; int errcode = pthread_create( &splitter_thread, 0, csplitter, &splitter_arg ); if( errcode ) - { show_error( "can't create splitter thread", errcode ); fatal(); } + { show_error( "Can't create splitter thread", errcode ); fatal(); } Worker_arg worker_arg; worker_arg.courier = &courier; @@ -426,12 +431,12 @@ int compress( const int data_size, const int dictionary_size, pthread_t * worker_threads = new( std::nothrow ) pthread_t[num_workers]; if( worker_threads == 0 ) - { pp( "not enough memory" ); fatal(); } + { pp( "Not enough memory" ); fatal(); } for( int i = 0; i < num_workers; ++i ) { errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_arg ); if( errcode ) - { show_error( "can't create worker threads", errcode ); fatal(); } + { show_error( "Can't create worker threads", errcode ); fatal(); } } muxer( courier, pp, outfd ); @@ -440,13 +445,13 @@ int compress( const int data_size, const int dictionary_size, { errcode = pthread_join( worker_threads[i], 0 ); if( errcode ) - { show_error( "can't join worker threads", errcode ); fatal(); } + { show_error( "Can't join worker threads", errcode ); fatal(); } } delete[] worker_threads; worker_threads = 0; errcode = pthread_join( splitter_thread, 0 ); if( errcode ) - { show_error( "can't join splitter thread", errcode ); fatal(); } + { show_error( "Can't join splitter thread", errcode ); fatal(); } if( verbosity >= 1 ) { |