diff options
Diffstat (limited to 'create_lz.cc')
-rw-r--r-- | create_lz.cc | 88 |
1 files changed, 44 insertions, 44 deletions
diff --git a/create_lz.cc b/create_lz.cc index 5436bf5..b390290 100644 --- a/create_lz.cc +++ b/create_lz.cc @@ -22,7 +22,7 @@ #include <cstdio> #include <queue> #include <pthread.h> -#include <stdint.h> // for lzlib.h +#include <stdint.h> // for lzlib.h #include <unistd.h> #include <sys/stat.h> #include <ftw.h> @@ -60,7 +60,7 @@ public: ~Slot_tally() { xdestroy_cond( &slot_av ); xdestroy_mutex( &mutex ); } - bool all_free() { return ( num_free == num_slots ); } + bool all_free() { return num_free == num_slots; } void get_slot() // wait for a free slot { @@ -94,8 +94,8 @@ struct Ipacket // filename, file size and headers struct Opacket // compressed data to be written to the archive { - const uint8_t * const data; // data == 0 means end of lzip member - const int size; // number of bytes in data (if any) + const uint8_t * data; // data == 0 means end of lzip member + int size; // number of bytes in data (if any) Opacket() : data( 0 ), size( 0 ) {} Opacket( uint8_t * const d, const int s ) : data( d ), size( s ) {} @@ -110,11 +110,11 @@ public: unsigned ocheck_counter; unsigned owait_counter; private: - int receive_worker_id; // worker queue currently receiving packets - int deliver_worker_id; // worker queue currently delivering packets + int receive_id; // worker queue currently receiving packets + int deliver_id; // worker queue currently delivering packets Slot_tally slot_tally; // limits the number of input packets std::vector< std::queue< const Ipacket * > > ipacket_queues; - std::vector< std::queue< const Opacket * > > opacket_queues; + std::vector< std::queue< Opacket > > opacket_queues; int num_working; // number of workers still running const int num_workers; // number of workers const unsigned out_slots; // max output packets per queue @@ -132,11 +132,10 @@ public: Packet_courier( const int workers, const int in_slots, const int oslots ) : icheck_counter( 0 ), iwait_counter( 0 ), ocheck_counter( 0 ), owait_counter( 0 ), - receive_worker_id( 0 ), deliver_worker_id( 0 ), - slot_tally( in_slots ), ipacket_queues( workers ), - opacket_queues( workers ), num_working( workers ), - num_workers( workers ), out_slots( oslots ), slot_av( workers ), - eof( false ) + receive_id( 0 ), deliver_id( 0 ), slot_tally( in_slots ), + ipacket_queues( workers ), opacket_queues( workers ), + num_working( workers ), num_workers( workers ), + out_slots( oslots ), slot_av( workers ), eof( false ) { xinit_mutex( &imutex ); xinit_cond( &iav_or_eof ); xinit_mutex( &omutex ); xinit_cond( &oav_or_exit ); @@ -157,9 +156,9 @@ public: if( !ipacket->filename.empty() ) slot_tally.get_slot(); // wait for a free slot xlock( &imutex ); - ipacket_queues[receive_worker_id].push( ipacket ); - if( ipacket->filename.empty() && ++receive_worker_id >= num_workers ) - receive_worker_id = 0; + ipacket_queues[receive_id].push( ipacket ); + if( ipacket->filename.empty() && ++receive_id >= num_workers ) + receive_id = 0; xbroadcast( &iav_or_eof ); xunlock( &imutex ); } @@ -194,44 +193,41 @@ public: } // collect an opacket from a worker - void collect_packet( const Opacket * const opacket, const int worker_id ) + void collect_packet( const Opacket & opacket, const int worker_id ) { xlock( &omutex ); - if( opacket->data ) + if( opacket.data ) { while( opacket_queues[worker_id].size() >= out_slots ) xwait( &slot_av[worker_id], &omutex ); } opacket_queues[worker_id].push( opacket ); - if( worker_id == deliver_worker_id ) xsignal( &oav_or_exit ); + if( worker_id == deliver_id ) xsignal( &oav_or_exit ); xunlock( &omutex ); } - /* Deliver an opacket to muxer. - If opacket data == 0, move to next queue and wait again. */ - const Opacket * deliver_packet() + /* Deliver opackets to muxer. + If opacket.data == 0, skip opacket and move to next queue. */ + void deliver_packets( std::vector< Opacket > & opacket_vector ) { - const Opacket * opacket = 0; + opacket_vector.clear(); xlock( &omutex ); ++ocheck_counter; - while( true ) - { - while( opacket_queues[deliver_worker_id].empty() && num_working > 0 ) + do { + while( opacket_queues[deliver_id].empty() && num_working > 0 ) + { ++owait_counter; xwait( &oav_or_exit, &omutex ); } + while( !opacket_queues[deliver_id].empty() ) { - ++owait_counter; - xwait( &oav_or_exit, &omutex ); + Opacket opacket = opacket_queues[deliver_id].front(); + opacket_queues[deliver_id].pop(); + if( opacket_queues[deliver_id].size() + 1 == out_slots ) + xsignal( &slot_av[deliver_id] ); + if( opacket.data ) opacket_vector.push_back( opacket ); + else if( ++deliver_id >= num_workers ) deliver_id = 0; } - if( opacket_queues[deliver_worker_id].empty() ) break; - opacket = opacket_queues[deliver_worker_id].front(); - opacket_queues[deliver_worker_id].pop(); - if( opacket_queues[deliver_worker_id].size() + 1 == out_slots ) - xsignal( &slot_av[deliver_worker_id] ); - if( opacket->data ) break; - if( ++deliver_worker_id >= num_workers ) deliver_worker_id = 0; - delete opacket; opacket = 0; } + while( opacket_vector.empty() && num_working > 0 ); xunlock( &omutex ); - return opacket; } void finish() // grouper has no more packets to send @@ -371,7 +367,7 @@ void loop_encode( const uint8_t * const ibuf, const int isize, { if( opos > max_packet_size ) internal_error( "opacket size exceeded in worker." ); - courier.collect_packet( new Opacket( obuf, opos ), worker_id ); + courier.collect_packet( Opacket( obuf, opos ), worker_id ); opos = 0; obuf = new( std::nothrow ) uint8_t[max_packet_size]; if( !obuf ) { show_error( mem_msg2 ); exit_fail_mt(); } if( LZ_compress_finished( encoder ) == 1 ) @@ -421,7 +417,7 @@ extern "C" void * cworker( void * arg ) { if( !flushed ) // this lzip member is not empty loop_encode( 0, 0, data, opos, courier, encoder, worker_id, true ); - courier.collect_packet( new Opacket, worker_id ); // end of member token + courier.collect_packet( Opacket(), worker_id ); // end of member token flushed = true; delete ipacket; continue; } @@ -501,15 +497,19 @@ extern "C" void * cworker( void * arg ) */ void muxer( Packet_courier & courier, const int outfd ) { + std::vector< Opacket > opacket_vector; while( true ) { - const Opacket * const opacket = courier.deliver_packet(); - if( !opacket ) break; // queue is empty. all workers exited + courier.deliver_packets( opacket_vector ); + if( opacket_vector.empty() ) break; // queue is empty. all workers exited - if( !writeblock_wrapper( outfd, opacket->data, opacket->size ) ) - exit_fail_mt(); - delete[] opacket->data; - delete opacket; + for( unsigned i = 0; i < opacket_vector.size(); ++i ) + { + Opacket & opacket = opacket_vector[i]; + if( !writeblock_wrapper( outfd, opacket.data, opacket.size ) ) + exit_fail_mt(); + delete[] opacket.data; + } } } |