diff options
Diffstat (limited to 'compress.cc')
-rw-r--r-- | compress.cc | 101 |
1 files changed, 57 insertions, 44 deletions
diff --git a/compress.cc b/compress.cc index 7945cf0..cf0135a 100644 --- a/compress.cc +++ b/compress.cc @@ -1,6 +1,6 @@ /* Plzip - A parallel compressor compatible with lzip Copyright (C) 2009 Laszlo Ersek. - Copyright (C) 2009, 2010 Antonio Diaz Diaz. + Copyright (C) 2009, 2010, 2011, 2012 Antonio Diaz Diaz. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -46,63 +46,63 @@ #endif -void xinit( pthread_cond_t * cond, pthread_mutex_t * mutex ) +void xinit( pthread_mutex_t * const mutex ) { - int errcode = pthread_cond_init( cond, 0 ); - if( errcode ) { show_error( "pthread_cond_init", errcode ); fatal(); } + const int errcode = pthread_mutex_init( mutex, 0 ); + if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); } + } - if( mutex ) - { - errcode = pthread_mutex_init( mutex, 0 ); - if( errcode ) { show_error( "pthread_mutex_init", errcode ); fatal(); } - } +void xinit( pthread_cond_t * const cond ) + { + const int errcode = pthread_cond_init( cond, 0 ); + if( errcode ) { show_error( "pthread_cond_init", errcode ); fatal(); } } -void xdestroy( pthread_cond_t * cond, pthread_mutex_t * mutex ) +void xdestroy( pthread_mutex_t * const mutex ) { - int errcode = pthread_cond_destroy( cond ); - if( errcode ) { show_error( "pthread_cond_destroy", errcode ); fatal(); } + const int errcode = pthread_mutex_destroy( mutex ); + if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); } + } - if( mutex ) - { - errcode = pthread_mutex_destroy( mutex ); - if( errcode ) { show_error( "pthread_mutex_destroy", errcode ); fatal(); } - } +void xdestroy( pthread_cond_t * const cond ) + { + const int errcode = pthread_cond_destroy( cond ); + if( errcode ) { show_error( "pthread_cond_destroy", errcode ); fatal(); } } -void xlock( pthread_mutex_t * mutex ) +void xlock( pthread_mutex_t * const mutex ) { - int errcode = pthread_mutex_lock( mutex ); + const int errcode = pthread_mutex_lock( mutex ); if( errcode ) { show_error( "pthread_mutex_lock", errcode ); fatal(); } } -void xunlock( pthread_mutex_t * mutex ) +void xunlock( pthread_mutex_t * const mutex ) { - int errcode = pthread_mutex_unlock( mutex ); + const int errcode = pthread_mutex_unlock( mutex ); if( errcode ) { show_error( "pthread_mutex_unlock", errcode ); fatal(); } } -void xwait( pthread_cond_t * cond, pthread_mutex_t * mutex ) +void xwait( pthread_cond_t * const cond, pthread_mutex_t * const mutex ) { - int errcode = pthread_cond_wait( cond, mutex ); + const int errcode = pthread_cond_wait( cond, mutex ); if( errcode ) { show_error( "pthread_cond_wait", errcode ); fatal(); } } -void xsignal( pthread_cond_t * cond ) +void xsignal( pthread_cond_t * const cond ) { - int errcode = pthread_cond_signal( cond ); + const int errcode = pthread_cond_signal( cond ); if( errcode ) { show_error( "pthread_cond_signal", errcode ); fatal(); } } -void xbroadcast( pthread_cond_t * cond ) +void xbroadcast( pthread_cond_t * const cond ) { - int errcode = pthread_cond_broadcast( cond ); + const int errcode = pthread_cond_broadcast( cond ); if( errcode ) { show_error( "pthread_cond_broadcast", errcode ); fatal(); } } @@ -142,17 +142,26 @@ private: pthread_cond_t oav_or_exit; // output packet available or all workers exited bool eof; // splitter done + Packet_courier( const Packet_courier & ); // declared as private + void operator=( const Packet_courier & ); // declared as private + public: - Packet_courier( const int num_workers, const int slots ) + Packet_courier( const int workers, const int slots ) : icheck_counter( 0 ), iwait_counter( 0 ), ocheck_counter( 0 ), owait_counter( 0 ), receive_id( 0 ), deliver_id( 0 ), slot_tally( slots ), circular_buffer( slots, (Packet *) 0 ), - num_working( num_workers ), num_slots( slots ), eof( false ) - { xinit( &iav_or_eof, &imutex ); xinit( &oav_or_exit, &omutex ); } + num_working( workers ), num_slots( slots ), eof( false ) + { + xinit( &imutex ); xinit( &iav_or_eof ); + xinit( &omutex ); xinit( &oav_or_exit ); + } ~Packet_courier() - { xdestroy( &iav_or_eof, &imutex ); xdestroy( &oav_or_exit, &omutex ); } + { + xdestroy( &oav_or_exit ); xdestroy( &omutex ); + xdestroy( &iav_or_eof ); xdestroy( &imutex ); + } const Slot_tally & tally() const { return slot_tally; } @@ -271,7 +280,7 @@ extern "C" void * csplitter( void * arg ) for( bool first_post = true; ; first_post = false ) { - uint8_t * data = new( std::nothrow ) uint8_t[data_size]; + uint8_t * const data = new( std::nothrow ) uint8_t[data_size]; if( data == 0 ) { pp( "Not enough memory" ); fatal(); } const int size = readblock( infd, data, data_size ); if( size != data_size && errno ) @@ -281,14 +290,15 @@ extern "C" void * csplitter( void * arg ) { in_size += size; courier.receive_packet( data, size ); + if( size < data_size ) break; // EOF } else { delete[] data; - courier.finish(); // no more packets to send break; } } + courier.finish(); // no more packets to send return 0; } @@ -314,11 +324,11 @@ extern "C" void * cworker( void * arg ) while( true ) { - Packet * packet = courier.distribute_packet(); + Packet * const packet = courier.distribute_packet(); if( packet == 0 ) break; // no more packets to process - const int compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); - uint8_t * const new_data = new( std::nothrow ) uint8_t[compr_size]; + const int max_compr_size = 42 + packet->size + ( ( packet->size + 7 ) / 8 ); + uint8_t * const new_data = new( std::nothrow ) uint8_t[max_compr_size]; if( new_data == 0 ) { pp( "Not enough memory" ); fatal(); } const int dict_size = std::max( LZ_min_dictionary_size(), std::min( dictionary_size, packet->size ) ); @@ -346,10 +356,11 @@ extern "C" void * cworker( void * arg ) if( wr < 0 ) internal_error( "library error (LZ_compress_write)" ); written += wr; } - if( written >= packet->size ) LZ_compress_finish( encoder ); + if( written >= packet->size ) + { delete[] packet->data; LZ_compress_finish( encoder ); } } const int rd = LZ_compress_read( encoder, new_data + new_size, - compr_size - new_size ); + max_compr_size - new_size ); if( rd < 0 ) { pp(); @@ -359,7 +370,7 @@ extern "C" void * cworker( void * arg ) fatal(); } new_size += rd; - if( new_size > compr_size ) + if( new_size > max_compr_size ) internal_error( "packet size exceeded in worker" ); if( LZ_compress_finished( encoder ) == 1 ) break; } @@ -367,7 +378,6 @@ extern "C" void * cworker( void * arg ) if( LZ_compress_close( encoder ) < 0 ) { pp( "LZ_compress_close failed" ); fatal(); } - delete[] packet->data; packet->data = new_data; packet->size = new_size; courier.collect_packet( packet ); @@ -405,12 +415,15 @@ void muxer( Packet_courier & courier, const Pretty_print & pp, const int outfd ) // call the muxer. int compress( const int data_size, const int dictionary_size, const int match_len_limit, const int num_workers, - const int num_slots, const int infd, const int outfd, + const int infd, const int outfd, const Pretty_print & pp, const int debug_level ) { + const int slots_per_worker = 2; + const int num_slots = ( ( INT_MAX / num_workers >= slots_per_worker ) ? + num_workers * slots_per_worker : INT_MAX ); in_size = 0; out_size = 0; - Packet_courier courier( num_workers, num_slots ); + Packet_courier courier( num_workers, num_slots - 1 ); Splitter_arg splitter_arg; splitter_arg.courier = &courier; @@ -434,7 +447,7 @@ int compress( const int data_size, const int dictionary_size, { pp( "Not enough memory" ); fatal(); } for( int i = 0; i < num_workers; ++i ) { - errcode = pthread_create( &worker_threads[i], 0, cworker, &worker_arg ); + errcode = pthread_create( worker_threads + i, 0, cworker, &worker_arg ); if( errcode ) { show_error( "Can't create worker threads", errcode ); fatal(); } } @@ -456,7 +469,7 @@ int compress( const int data_size, const int dictionary_size, if( verbosity >= 1 ) { if( in_size <= 0 || out_size <= 0 ) - std::fprintf( stderr, "no data compressed.\n" ); + std::fprintf( stderr, " no data compressed.\n" ); else std::fprintf( stderr, "%6.3f:1, %6.3f bits/byte, " "%5.2f%% saved, %lld in, %lld out.\n", |