diff options
Diffstat (limited to '')
-rw-r--r-- | dec_stdout.cc | 121 |
1 files changed, 61 insertions, 60 deletions
diff --git a/dec_stdout.cc b/dec_stdout.cc index 6ffed07..d32f7a8 100644 --- a/dec_stdout.cc +++ b/dec_stdout.cc @@ -46,10 +46,10 @@ struct Packet // data block uint8_t * data; // data may be null if size == 0 int size; // number of bytes in data (if any) bool eom; // end of member - Packet() : data( 0 ), size( 0 ), eom( true ) {} + Packet() : data( 0 ), size( 0 ), eom( false ) {} Packet( uint8_t * const d, const int s, const bool e ) : data( d ), size( s ), eom ( e ) {} - ~Packet() { if( data ) delete[] data; } + void delete_data() { if( data ) { delete[] data; data = 0; } } }; @@ -59,8 +59,8 @@ public: unsigned ocheck_counter; unsigned owait_counter; private: - int deliver_worker_id; // worker queue currently delivering packets - std::vector< std::queue< const Packet * > > opacket_queues; + int deliver_id; // worker queue currently delivering packets + std::vector< std::queue< Packet > > opacket_queues; int num_working; // number of workers still running const int num_workers; // number of workers const unsigned out_slots; // max output packets per queue @@ -75,10 +75,9 @@ private: public: Packet_courier( const Shared_retval & sh_ret, const int workers, const int slots ) - : ocheck_counter( 0 ), owait_counter( 0 ), deliver_worker_id( 0 ), - opacket_queues( workers ), num_working( workers ), - num_workers( workers ), out_slots( slots ), slot_av( workers ), - shared_retval( sh_ret ) + : ocheck_counter( 0 ), owait_counter( 0 ), deliver_id( 0 ), + opacket_queues( workers ), num_working( workers ), num_workers( workers ), + out_slots( slots ), slot_av( workers ), shared_retval( sh_ret ) { xinit_mutex( &omutex ); xinit_cond( &oav_or_exit ); for( unsigned i = 0; i < slot_av.size(); ++i ) xinit_cond( &slot_av[i] ); @@ -89,7 +88,7 @@ public: if( shared_retval() ) // cleanup to avoid memory leaks for( int i = 0; i < num_workers; ++i ) while( !opacket_queues[i].empty() ) - { delete opacket_queues[i].front(); opacket_queues[i].pop(); } + { opacket_queues[i].front().delete_data(); opacket_queues[i].pop(); } for( unsigned i = 0; i < slot_av.size(); ++i ) xdestroy_cond( &slot_av[i] ); xdestroy_cond( &oav_or_exit ); xdestroy_mutex( &omutex ); } @@ -102,49 +101,47 @@ public: xunlock( &omutex ); } - // collect a packet from a worker, discard packet on error - void collect_packet( const Packet * const opacket, const int worker_id ) + // make a packet with data received from a worker, discard data on error + void collect_packet( const int worker_id, uint8_t * const data, + const int size, const bool eom ) { + Packet opacket( data, size, eom ); xlock( &omutex ); - if( opacket->data ) + if( data ) while( opacket_queues[worker_id].size() >= out_slots ) { - if( shared_retval() ) { delete opacket; goto done; } + if( shared_retval() ) { delete[] data; goto out; } xwait( &slot_av[worker_id], &omutex ); } opacket_queues[worker_id].push( opacket ); - if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); -done: - xunlock( &omutex ); + if( worker_id == deliver_id ) xsignal( &oav_or_exit ); +out: xunlock( &omutex ); } - /* deliver a packet to muxer - if packet->eom, move to next queue - if packet data == 0, wait again */ - const Packet * deliver_packet() + /* deliver packets to muxer + if opacket.eom, move to next queue + if opacket.data == 0, skip opacket */ + void deliver_packets( std::vector< Packet > & packet_vector ) { - const Packet * opacket = 0; + packet_vector.clear(); xlock( &omutex ); ++ocheck_counter; - while( true ) - { - while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + do { + while( opacket_queues[deliver_id].empty() && num_working > 0 ) + { ++owait_counter; xwait( &oav_or_exit, &omutex ); } + while( true ) { - ++owait_counter; - xwait( &oav_or_exit, &omutex ); + if( opacket_queues[deliver_id].empty() ) break; + Packet opacket = opacket_queues[deliver_id].front(); + opacket_queues[deliver_id].pop(); + if( opacket_queues[deliver_id].size() + 1 == out_slots ) + xsignal( &slot_av[deliver_id] ); + if( opacket.eom && ++deliver_id >= num_workers ) deliver_id = 0; + if( opacket.data ) packet_vector.push_back( opacket ); } - if( opacket_queues[deliver_worker_id].empty() ) break; - opacket = opacket_queues[deliver_worker_id].front(); - opacket_queues[deliver_worker_id].pop(); - if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) - xsignal( &slot_av[deliver_worker_id] ); - if( opacket->eom && ++deliver_worker_id >= num_workers ) - deliver_worker_id = 0; - if( opacket->data ) break; - delete opacket; opacket = 0; } + while( packet_vector.empty() && num_working > 0 ); xunlock( &omutex ); - return opacket; } bool finished() // all packets delivered to muxer @@ -163,9 +160,14 @@ struct Worker_arg Packet_courier * courier; const Pretty_print * pp; Shared_retval * shared_retval; - int worker_id; - int num_workers; int infd; + int num_workers; + int worker_id; + void assign( const Lzip_index & li, Packet_courier & co, + const Pretty_print & pp_, Shared_retval & sr, + const int ifd, const int nw, const int wi ) + { lzip_index = &li; courier = &co; pp = &pp_; shared_retval = &sr; + infd = ifd; num_workers = nw; worker_id = wi; } }; @@ -179,9 +181,9 @@ extern "C" void * dworker_o( void * arg ) Packet_courier & courier = *tmp.courier; const Pretty_print & pp = *tmp.pp; Shared_retval & shared_retval = *tmp.shared_retval; - const int worker_id = tmp.worker_id; - const int num_workers = tmp.num_workers; const int infd = tmp.infd; + const int num_workers = tmp.num_workers; + const int worker_id = tmp.worker_id; const int buffer_size = 65536; int new_pos = 0; @@ -231,12 +233,11 @@ extern "C" void * dworker_o( void * arg ) const bool eom = LZ_decompress_finished( decoder ) == 1; if( new_pos == max_packet_size || eom ) // make data packet { - const Packet * const opacket = - new Packet( ( new_pos > 0 ) ? new_data : 0, new_pos, eom ); - courier.collect_packet( opacket, worker_id ); + courier.collect_packet( worker_id, ( new_pos > 0 ) ? new_data : 0, + new_pos, eom ); if( new_pos > 0 ) { new_pos = 0; new_data = 0; } if( eom ) - { LZ_decompress_reset( decoder ); // prepare for new member + { LZ_decompress_reset( decoder ); // prepare for next member break; } } if( rd == 0 ) break; @@ -262,23 +263,28 @@ done: void muxer( Packet_courier & courier, const Pretty_print & pp, Shared_retval & shared_retval, const int outfd ) { + std::vector< Packet > packet_vector; while( true ) { - const Packet * const opacket = courier.deliver_packet(); - if( !opacket ) break; // queue is empty. all workers exited - - if( shared_retval() == 0 && - writeblock( outfd, opacket->data, opacket->size ) != opacket->size && - shared_retval.set_value( 1 ) ) - { pp(); show_error( "Write error", errno ); } - delete opacket; + courier.deliver_packets( packet_vector ); + if( packet_vector.empty() ) break; // queue is empty. all workers exited + + for( unsigned i = 0; i < packet_vector.size(); ++i ) + { + Packet & opacket = packet_vector[i]; + if( shared_retval() == 0 && + writeblock( outfd, opacket.data, opacket.size ) != opacket.size && + shared_retval.set_value( 1 ) ) + { pp(); show_error( write_error_msg, errno ); } + opacket.delete_data(); + } } } } // end namespace -// init the courier, then start the workers and call the muxer. +// init the courier, then start the workers and call the muxer int dec_stdout( const int num_workers, const int infd, const int outfd, const Pretty_print & pp, const int debug_level, const int out_slots, const Lzip_index & lzip_index ) @@ -294,13 +300,8 @@ int dec_stdout( const int num_workers, const int infd, const int outfd, int i = 0; // number of workers started for( ; i < num_workers; ++i ) { - worker_args[i].lzip_index = &lzip_index; - worker_args[i].courier = &courier; - worker_args[i].pp = &pp; - worker_args[i].shared_retval = &shared_retval; - worker_args[i].worker_id = i; - worker_args[i].num_workers = num_workers; - worker_args[i].infd = infd; + worker_args[i].assign( lzip_index, courier, pp, shared_retval, infd, + num_workers, i ); const int errcode = pthread_create( &worker_threads[i], 0, dworker_o, &worker_args[i] ); if( errcode ) |